一、配置详解

场景描述:MySQL数据表以全量和增量的方式向ElasticSearch搜索引擎同步。

1、下载内容

  • elasticsearch 版本 6.3.2
  • logstash 版本 6.3.2
  • mysql-connector-java-5.1.13.jar

    2、核心配置

  • 路径:/usr/local/logstash

  • 新建配置目录:sync-config

1)、配置全文
/usr/local/logstash/sync-config/cicadaes.conf

  1. input {
  2. stdin {}
  3. jdbc {
  4. jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/cicada?characterEncoding=utf8"
  5. jdbc_user => "root"
  6. jdbc_password => "root123"
  7. jdbc_driver_library => "/usr/local/logstash/sync-config/mysql-connector-java-5.1.13.jar"
  8. jdbc_driver_class => "com.mysql.jdbc.Driver"
  9. jdbc_paging_enabled => "true"
  10. jdbc_page_size => "50000"
  11. jdbc_default_timezone => "Asia/Shanghai"
  12. statement_filepath => "/usr/local/logstash/sync-config/user_sql.sql"
  13. schedule => "* * * * *"
  14. type => "User"
  15. lowercase_column_names => false
  16. record_last_run => true
  17. use_column_value => true
  18. tracking_column => "updateTime"
  19. tracking_column_type => "timestamp"
  20. last_run_metadata_path => "/usr/local/logstash/sync-config/user_last_time"
  21. clean_run => false
  22. }
  23. jdbc {
  24. jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/cicada?characterEncoding=utf8"
  25. jdbc_user => "root"
  26. jdbc_password => "root123"
  27. jdbc_driver_library => "/usr/local/logstash/sync-config/mysql-connector-java-5.1.13.jar"
  28. jdbc_driver_class => "com.mysql.jdbc.Driver"
  29. jdbc_paging_enabled => "true"
  30. jdbc_page_size => "50000"
  31. jdbc_default_timezone => "Asia/Shanghai"
  32. statement_filepath => "/usr/local/logstash/sync-config/log_sql.sql"
  33. schedule => "* * * * *"
  34. type => "Log"
  35. lowercase_column_names => false
  36. record_last_run => true
  37. use_column_value => true
  38. tracking_column => "updateTime"
  39. tracking_column_type => "timestamp"
  40. last_run_metadata_path => "/usr/local/logstash/sync-config/log_last_time"
  41. clean_run => false
  42. }
  43. }
  44. filter {
  45. json {
  46. source => "message"
  47. remove_field => ["message"]
  48. }
  49. }
  50. output {
  51. if [type] == "User" {
  52. elasticsearch {
  53. hosts => ["127.0.0.1:9200"]
  54. index => "cicada_user_search"
  55. document_type => "user_search_index"
  56. }
  57. }
  58. if [type] == "Log" {
  59. elasticsearch {
  60. hosts => ["127.0.0.1:9200"]
  61. index => "cicada_log_search"
  62. document_type => "log_search_index"
  63. }
  64. }
  65. }

2)、SQL文件

  • user_sql.sql

    1. SELECT
    2. id,
    3. user_name userName,
    4. user_phone userPhone,
    5. create_time createTime,
    6. update_time updateTime
    7. FROM c_user
    8. WHERE update_time > : sql_last_value
  • log_sql.sql

    1. SELECT
    2. id,
    3. param_value paramValue,
    4. request_ip requestIp,
    5. create_time createTime,
    6. update_time updateTime
    7. FROM c_log
    8. WHERE update_time > : sql_last_value

    3)、配置参数说明

  • input参数

    1. statement_filepath:读取SQL语句位置
    2. schedule :这里配置每分钟执行一次
    3. type :类型,写入ES的标识
    4. lowercase_column_names :字段是否转小写
    5. record_last_run :记录上次执行时间
    6. use_column_value :使用列的值
    7. tracking_column :根据写入ESupdateTime字段区分增量数据
    8. tracking_column_type :区分的字段类型
  • output参数

    1. hosts ES服务地址
    2. index Index名称,类比理解数据库名称
    3. document_type Type名称,类比理解表名称

    3、启动进程

    1. /usr/local/logstash/bin/logstash
    2. -f
    3. /usr/local/logstash/sync-config/cicadaes.conf

    二、ES客户端工具

    1、下载软件
    kibana-6.3.2-windows-x86_64
    2、修改配置
    kibana-6.3.2-windows-x86_64\config\kibana.yml
    添加配置:

    1. elasticsearch.url: "http://127.0.0.1:9200"

    3、双击启动
    kibana-6.3.2-windows-x86_64\bin\kibana.bat
    4、访问地址

    1. http://localhost:5601

    MySQL数据以全量和增量方式,同步到ES搜索引擎 - 图1