一、 介绍

Logstash 是具有管道输送能力的开源数据收集引擎。它是一款日志而不仅限于日志的搜集处理框架,它可以动态地从分散的数据源收集数据,并且可以标准化数据格式输送到你选择的目的地。

二、 操作步骤

  1. 1.下载logstash(这里我放在/es目录下):
  2. [root@localhost es]# wget https://artifacts.elastic.co/downloads/logstash/logstash-7.3.2.tar.gz
  3. (注意:如果出现"-bash: wget: 未找到命令",则使用命令先安装wget: yum -y install wget)
  4. 2.解压:[root@localhost es]# tar -zxvf logstash-7.3.2.tar.gz
  5. 3.修改jvm内存(默认是1G,如果机器内存足够,这里可以不用更改)
  6. [root@localhost config]# vi jvm.options
  7. -Xms256m
  8. -Xmx256m
  9. 4. 测试能不能运行,进入到/es/logstash-7.3.2/bin目录,然后执行命令:
  10. [root@localhost bin]# ./logstash -e 'input { stdin { } } output { stdout {} }'
  11. (注意:如果出现:Could not find any executable java binary. Please install java in your PATH or set JAVA_HOME.则需要先安装jdk)
  12. 5. 安装 jdbc elasticsearch 插件,ctrl+c取消运行然后分别输入以下命令:
  13. a.安装jdbc插件(这条命令要等好久。。。。):
  14. [root@localhost logstash-7.3.2]# bin/logstash-plugin install logstashinput-
  15. jdbc
  16. b.安装elasticsearch 插件:
  17. [root@localhost logstash-7.3.2]# bin/logstash-plugin install logstashoutput-
  18. elasticsearch
  19. 6.准备mysql-connector-java驱动包,放到/es/logstash-7.3.2/config目录下
  20. 7.config目录下,编写同步配置文件 logstash.conf:

logstash.conf:

  1. input {
  2. jdbc {
  3. type => "product"
  4. # mysql相关jdbc配置
  5. jdbc_connection_string => "jdbc:mysql://49.232.91.87:3306/ec-goods?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC"
  6. jdbc_user => "root"
  7. jdbc_password => "oracle!123"
  8. # jdbc连接mysql驱动的文件 此处路径一定要正确 否则会报com.mysql.jdbc.Driver could not be loaded
  9. jdbc_driver_library => "/es/logstash-7.3.2/mysql-connector-java-5.1.46.jar"
  10. # the name of the driver class for mysql
  11. jdbc_driver_class => "com.mysql.jdbc.Driver"
  12. # 开启分页查询
  13. jdbc_paging_enabled => true
  14. jdbc_page_size => "50000"
  15. # 同步SQL语句:
  16. # 如果要使字段和实体类的驼峰命名法一致则需要这样写:
  17. statement => "select id,name,description,price,stock,level1_id as level1Id,level2_id as level2Id,level3_id as level3Id,main_img as mainImg,sub_imgs as subImgs,status,create_time as createTime,update_time as updateTime from product where update_time >= :sql_last_value order by update_time asc"
  18. # 定制定时操作,比如每分钟执行一次同步(分 时 天 月 年)
  19. schedule => "* * * * *"
  20. #是否把大写字段名称全改成小写
  21. lowercase_column_names => "false"
  22. # 是否记录上次执行结果, 如果为真,将会把上次执行到的跟踪字段的值记录下来,保存到
  23. last_run_metadata_path 指定的文件中
  24. record_last_run => true
  25. # 是否需要记录某个字段的值,如果record_last_run为真,可以自定义我们需要跟踪的字段名称,此时该参数就要为 true. 否则默认跟踪的是 timestamp 的值.
  26. use_column_value => true
  27. # 如果 use_column_value 为真,需配置此参数.跟踪的数据库字段名,该字段必须是递增的. 如果字段使用了别名,这里需要使用别名
  28. tracking_column => "updateTime"
  29. #跟踪字段的类型
  30. tracking_column_type => "timestamp"
  31. # 最后更新时间文件位置
  32. last_run_metadata_path => "record_last_run_product"
  33. # 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
  34. clean_run => false
  35. }
  36. jdbc {
  37. type => "category"
  38. jdbc_connection_string => "jdbc:mysql://49.232.91.87:3306/ec-goods?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC"
  39. jdbc_user => "root"
  40. jdbc_password => "oracle!123"
  41. jdbc_driver_library => "/es/logstash-7.3.2/mysql-connector-java-5.1.46.jar"
  42. jdbc_driver_class => "com.mysql.jdbc.Driver"
  43. jdbc_paging_enabled => true
  44. jdbc_page_size => "50000"
  45. statement => "select id,name,parent_id as parentId,level,icon,status,create_time as createTime,update_time as updateTime from category where update_time >= :sql_last_value order by update_time asc"
  46. schedule => "* * * * *"
  47. lowercase_column_names => "false"
  48. record_last_run => true
  49. use_column_value => true
  50. tracking_column => "updateTime"
  51. tracking_column_type => "timestamp"
  52. last_run_metadata_path => "record_last_run_category"
  53. clean_run => false
  54. }
  55. }
  56. output {
  57. if [type] == "product" {
  58. elasticsearch {
  59. hosts => ["192.168.180.110:9200"]
  60. # index名 自定义相当于数据库
  61. index => "product"
  62. #需要关联的数据库中有一个id字段,对应索引的id号
  63. document_id => "%{id}"
  64. }
  65. }
  66. if [type] == "category" {
  67. elasticsearch {
  68. hosts => ["192.168.180.110:9200"]
  69. # index名 自定义相当于数据库
  70. index => "category"
  71. #需要关联的数据库中有一个id字段,对应索引的id号
  72. document_id => "%{id}"
  73. }
  74. }# 这里输出调试,正式运行时可以注释掉
  75. stdout {
  76. codec => json_lines
  77. }
  78. }
  1. 9.启动logstash,进入到/es/logstash-7.3.2目录:
  2. [root@localhost logstash-7.3.2]# ./bin/logstash -f ./config/logstash.conf
  3. (注意:如果报"Expected one of #,input,filter,output at line 1,column 1(byte1)after"错误,则把配置文件改为UTF-8BOM模式即可)UTF-8+BOM
  4. 10.设置logstash同步mysql数据到es使用ik中文分词器只需在同步前在kibana创建索引设置对应中文字段的分词器为ik_max_word即可
  5. 测试IK分词是否可用:
  6. get _analyze
  7. {
  8. "analyzer":"ik_max_word",
  9. "text":"中华人民共和国"
  10. }

三、测试

  1. #设置index中文字段为ik分词器
  2. #查看索引mapping(结构映射)
  3. get product/_mapping
  4. get category/_mapping
  5. #添加索引
  6. put product
  7. put category
  8. #设置索引mapping(结构映射)
  9. post product/_mapping
  10. {
  11. "properties" : {
  12. "@timestamp" : {
  13. "type" : "date"
  14. },
  15. "@version" : {
  16. "type" : "text",
  17. "fields" : {
  18. "keyword" : {
  19. "type" : "keyword",
  20. "ignore_above" : 256
  21. }
  22. }
  23. },
  24. "createTime" : {
  25. "type" : "date"
  26. },
  27. "description" : {
  28. "analyzer" : "ik_max_word",
  29. "type" : "text",
  30. "fields" : {
  31. "keyword" : {
  32. "type" : "keyword",
  33. "ignore_above" : 256
  34. }
  35. }
  36. },
  37. "id" : {
  38. "type" : "long"
  39. },
  40. "level1Id" : {
  41. "type" : "long"
  42. },
  43. "level2Id" : {
  44. "type" : "long"
  45. },
  46. "level3Id" : {
  47. "type" : "long"
  48. },
  49. "mainImg" : {
  50. "analyzer" : "ik_max_word",
  51. "type" : "text",
  52. "fields" : {
  53. "keyword" : {
  54. "type" : "keyword",
  55. "ignore_above" : 256
  56. }
  57. }
  58. },
  59. "name" : {
  60. "analyzer" : "ik_max_word",
  61. "type" : "text",
  62. "fields" : {
  63. "keyword" : {
  64. "type" : "keyword",
  65. "ignore_above" : 256
  66. }
  67. }
  68. },
  69. "price" : {
  70. "type" : "float"
  71. },
  72. "status" : {
  73. "type" : "long"
  74. },
  75. "stock" : {
  76. "type" : "long"
  77. },
  78. "type" : {
  79. "analyzer" : "ik_max_word",
  80. "type" : "text",
  81. "fields" : {
  82. "keyword" : {
  83. "type" : "keyword",
  84. "ignore_above" : 256
  85. }
  86. }
  87. },
  88. "updateTime" : {
  89. "type" : "date"
  90. }
  91. }
  92. }
  93. post category/_mapping
  94. {
  95. "properties" : {
  96. "@timestamp" : {
  97. "type" : "date"
  98. },
  99. "@version" : {
  100. "type" : "text",
  101. "fields" : {
  102. "keyword" : {
  103. "type" : "keyword",
  104. "ignore_above" : 256
  105. }
  106. }
  107. },
  108. "createTime" : {
  109. "type" : "date"
  110. },
  111. "icon" : {
  112. "analyzer" : "ik_max_word",
  113. "type" : "text",
  114. "fields" : {
  115. "keyword" : {
  116. "type" : "keyword",
  117. "ignore_above" : 256
  118. }
  119. }
  120. },
  121. "id" : {
  122. "type" : "long"
  123. },
  124. "level" : {
  125. "type" : "long"
  126. },
  127. "name" : {
  128. "analyzer" : "ik_max_word",
  129. "type" : "text",
  130. "fields" : {
  131. "keyword" : {
  132. "type" : "keyword",
  133. "ignore_above" : 256
  134. }
  135. }
  136. },
  137. "parentId" : {
  138. "type" : "long"
  139. },
  140. "status" : {
  141. "type" : "long"
  142. },
  143. "type" : {
  144. "analyzer" : "ik_max_word",
  145. "type" : "text",
  146. "fields" : {
  147. "keyword" : {
  148. "type" : "keyword",
  149. "ignore_above" : 256
  150. }
  151. }
  152. },
  153. "updateTime" : {
  154. "type" : "date"
  155. }
  156. }
  157. }
  158. #删除last_run_metadata_path文件,启动logstash重新同步数据