logstash同步mysql数据到elasticsearch
1.创建elasticsearch的映射mysql的索引,例如
PUT /video{"settings": {"number_of_replicas": 0,"number_of_shards": 1},"mappings": {"properties": {"id": {"type": "long"},"title": {"type": "text","analyzer": "ik_max_word"},"summary": {"type": "text","analyzer": "ik_max_word"},"cover_img": {"type": "keyword"},"view_num": {"type": "integer"},"price": {"type": "double"},"create_time": {"type": "date","format": "yyyy-MM-dd HH:mm:ss"},"online": {"type": "integer"},"point": {"type": "double"}}}}
2.编写logstash配置文件
input {jdbc {jdbc_driver_library => "/usr/local/elasticsearch/mysql-connector-java-5.1.47.jar" // mysql连接驱动包地址jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://192.168.233.100:3306/mytest"jdbc_user => "root"jdbc_password => "xxxx"schedule => "* * * * *"clean_run => truejdbc_default_timezone => "Asia/Shanghai" // 设置时区statement => "select id,title,summary,cover_img,view_num,price,DATE_FORMAT(update_time,'%Y-%m-%d %T') as update_time,DATE_FORMAT(create_time,'%Y-%m-%d %T') as create_time,`online`,point from video where update_time>:sql_last_value and update_time<NOW() order by update_time desc" // 执行mysql语句}}filter {ruby {code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"}ruby {code => "event.set('@timestamp',event.get('timestamp'))"}mutate {remove_field => ["timestamp"]}# ruby {# code => "event.set('update_time', event.get('update_time').time.localtime + 8*60*60)"# }# ruby {# code => "event.set('create_time', event.get('create_time').time.localtime + 8*60*60)"# }}output {elasticsearch{hosts => ["192.168.233.100"]index => "video" // elasticsearch的索引document_id => "%{id}" // elasticsearch的文档id对于mysqlid}}
启动logstash
./logstash -f 配置文件名 例如:./logstash -f ../conf/xxx.conf
启动后就可以看到mysql中的数据同步到elasticsearch中了,但是这个只适合做全量导入,不适合增量更新,因为它是通过定时器来刷数据,不能达到近实时性。
