logstash同步mysql数据到elasticsearch

1.创建elasticsearch的映射mysql的索引,例如

  1. PUT /video
  2. {
  3. "settings": {
  4. "number_of_replicas": 0,
  5. "number_of_shards": 1
  6. },
  7. "mappings": {
  8. "properties": {
  9. "id": {
  10. "type": "long"
  11. },
  12. "title": {
  13. "type": "text",
  14. "analyzer": "ik_max_word"
  15. },
  16. "summary": {
  17. "type": "text",
  18. "analyzer": "ik_max_word"
  19. },
  20. "cover_img": {
  21. "type": "keyword"
  22. },
  23. "view_num": {
  24. "type": "integer"
  25. },
  26. "price": {
  27. "type": "double"
  28. },
  29. "create_time": {
  30. "type": "date",
  31. "format": "yyyy-MM-dd HH:mm:ss"
  32. },
  33. "online": {
  34. "type": "integer"
  35. },
  36. "point": {
  37. "type": "double"
  38. }
  39. }
  40. }
  41. }

2.编写logstash配置文件

  1. input {
  2. jdbc {
  3. jdbc_driver_library => "/usr/local/elasticsearch/mysql-connector-java-5.1.47.jar" // mysql连接驱动包地址
  4. jdbc_driver_class => "com.mysql.jdbc.Driver"
  5. jdbc_connection_string => "jdbc:mysql://192.168.233.100:3306/mytest"
  6. jdbc_user => "root"
  7. jdbc_password => "xxxx"
  8. schedule => "* * * * *"
  9. clean_run => true
  10. jdbc_default_timezone => "Asia/Shanghai" // 设置时区
  11. statement => "select id,title,summary,cover_img,view_num,price,DATE_FORMAT(update_time,'%Y-%m-%d %T') as update_time,DATE_FORMAT(create_time,'%Y-%m-%d %T') as create_time,`online`,point from video where update_time>:sql_last_value and update_time<NOW() order by update_time desc" // 执行mysql语句
  12. }
  13. }
  14. filter {
  15. ruby {
  16. code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
  17. }
  18. ruby {
  19. code => "event.set('@timestamp',event.get('timestamp'))"
  20. }
  21. mutate {
  22. remove_field => ["timestamp"]
  23. }
  24. # ruby {
  25. # code => "event.set('update_time', event.get('update_time').time.localtime + 8*60*60)"
  26. # }
  27. # ruby {
  28. # code => "event.set('create_time', event.get('create_time').time.localtime + 8*60*60)"
  29. # }
  30. }
  31. output {
  32. elasticsearch{
  33. hosts => ["192.168.233.100"]
  34. index => "video" // elasticsearch的索引
  35. document_id => "%{id}" // elasticsearch的文档id对于mysqlid
  36. }
  37. }

启动logstash

./logstash -f 配置文件名 例如:./logstash -f ../conf/xxx.conf

启动后就可以看到mysql中的数据同步到elasticsearch中了,但是这个只适合做全量导入,不适合增量更新,因为它是通过定时器来刷数据,不能达到近实时性。