下面就说下具体怎么配置。
    1.先在安装目录bin下面(一般都是在bin下面)新建两个文件jdbc.conf和jdbc.sql
    image.png
    2.配置jdbc.conf

    1. input {
    2. stdin {
    3. }
    4. jdbc {
    5. # 连接的数据库地址和哪一个数据库,指定编码格式,禁用SSL协议,设定自动重连
    6. jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/microstorage_backend?characterEncoding=UTF-8&useSSL=false&autoReconnect=true"
    7. jdbc_user => "root"
    8. jdbc_password => "123456"
    9. # 下载连接数据库的驱动包,建议使用绝对地址
    10. jdbc_driver_library => "/usr/local/Cellar/logstash/6.5.4/libexec/logstash-core/lib/jars/mysql-connector-java-5.1.42.jar"
    11. jdbc_driver_class => "com.mysql.jdbc.Driver"
    12. jdbc_paging_enabled => "true"
    13. jdbc_page_size => "50000"
    14. codec => plain { charset => "UTF-8"}
    15. #使用其它字段追踪,而不是用时间
    16. #use_column_value => true //这里如果是用时间追踪比如:数据的更新时间或创建时间等和时间有关的这里一定不能是true, 切记切记切记,我是用update_time来追踪的
    17. #追踪的字段
    18. tracking_column => update_time
    19. record_last_run => true
    20. #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值 这里说是必须指定初始值,我没指定默认是1970-01-01 080000
    21. last_run_metadata_path => "/usr/local/opt/logstash/lastrun/.logstash_jdbc_last_run" //这里的lastrun文件夹和.logstash_jdbc_last_run是自己创建的
    22. jdbc_default_timezone => "Asia/Shanghai" //设置时区
    23. #statement => SELECT * FROM goods WHERE update_time > :last_sql_value //这里要说明一下如果直接写sql语句,前面这种写法肯定不对的
    24.                                                 ,加上引号也试过也不对,所以我直接写在jdbc.sql文件中
    25. statement_filepath => "/usr/local/Cellar/logstash/6.5.4/bin/jdbc.sql"
    26. #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
    27. clean_run => false
    28. # 这是控制定时的,重复执行导入任务的时间间隔,第一位是分钟 不设置就是1分钟执行一次
    29. schedule => "* * * * *"
    30. type => "std"
    31. }
    32. }
    33. filter {
    34. json {
    35. source => "message"
    36. remove_field => ["message"]
    37. }
    38. }
    39. output {
    40. elasticsearch {
    41. # 要导入到的Elasticsearch所在的主机
    42. hosts => "127.0.0.1:9200"
    43. # 要导入到的Elasticsearch的索引的名称
    44. index => "goods"
    45. # 类型名称(类似数据库表名)
    46. document_type => "spu"
    47. # 主键名称(类似数据库主键)
    48. document_id => "%{id}"
    49. }
    50. stdout {
    51. # JSON格式输出
    52. codec => json_lines
    53. }
    54. }

    3.配置jdbc.sql

    1. select id,goods_name,goods_no,price,account_id,create_time,update_time from goods where update_time > :sql_last_value

    4.我们来看下 .logstash_jdbc_last_run文件中的内容(网上讲述该配置的时候都没讲到里面具体的内容写法,导致很多人很迷惑,其中我就是)
    image.png
    前面的—-具体什么意思,我也不太清楚。

    5.启动jdbc.conf配置,开始同步数据
    image.png

    第一次:因为时间是从1970年开始的所以会全部同步一遍。相当于全量同步了
    image.png
    从第二次开始,会从上次最新的一次时间同步,既新增和修改都会同步
    image.png

    遇到的问题:
    1.ES中8小时时差的问题?

    1.  filter {
    2. json {
    3. source => "message"
    4. remove_field => ["message"]
    5. }
    6.    // date类型不能省略,不然会报错, 就是把当前字段+8小时后赋值给新的字段,然后再取新字段的值赋值给老的字段,再把新的字段删除
    7. date {
    8. match => ["message","UNIX_MS"]
    9. target => "@timestamp"
    10. }
    11. ruby {
    12. code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
    13. }
    14. ruby{
    15. code => "event.set('@timestamp',event.get('timestamp'))"
    16. }
    17. mutate{
    18. remove_field => ["timestamp"]
    19. }
    20. date {
    21. match => ["message","UNIX_MS"]
    22. target => "create_time"
    23. }
    24. ruby {
    25. code => "event.set('@create_time', event.get('create_time').time.localtime + 8*60*60)"
    26. }
    27. ruby {
    28. code => "event.set('create_time',event.get('@create_time'))"
    29. }
    30. mutate {
    31. remove_field => ["@create_time"]
    32. }
    33. date {
    34. match => ["message","UNIX_MS"]
    35. target => "update_time"
    36. }
    37. ruby {
    38. code => "event.set('@update_time', event.get('update_time').time.localtime + 8*60*60)"
    39. }
    40. ruby {
    41. code => "event.set('update_time',event.get('@update_time'))"
    42. }
    43. mutate {
    44. remove_field => ["@update_time"]
    45. }
    46. }

    解决方法:从源头解决问题
    在jdbc.conf配置文件中只要是有关时间的字段都手动+8小时
    总结:主要是配置,有什么问题,先检查配置文件。
    https://github.com/childe/gohangout
    https://github.com/Ringloop/mr-plow