注意,在操作以下内容之前,请确认你的elasticsearch和logstash环境已经配置好再进行;

参考

https://www.yuque.com/wangyekun/hgflvh/xt1sts
https://blog.csdn.net/weixin_41753567/article/details/125497958
https://blog.51cto.com/ch3nnn/5483295

说明

Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的 “存储库” 中,可与各种部署集成。 它提供了大量插件,可帮助你解析,丰富,转换和缓冲来自各种来源的数据。 如果你的数据需要 Beats 中没有的其他处理,则需要将 Logstash 添加到部署中。
这个工具不止可以用来做 mysql 到 es 的数据同步,它的应用场景还有:日志搜索器( logstash 采集、处理、转发到 elasticsearch 存储,在 kibana 进行展示)、elk 日志分析(elasticsearch + logstash + kibana)等。
它既可以全量同步旧数据,也可以增量同步新数据,而且对 mysql 和 es 没有版本方面的限制,只需对应版本即可

开启binarylog

在使用Binlog前,首先需要确认mysql是否开启了Binlog,此时,我们可以使用下面的命令:

  1. docker exec -it mysql /bin/bash
  1. mysql -uroot -p123456
  1. SHOW VARIABLES LIKE 'LOG_BIN';

1664889880987.png
如果Binlog没有开启怎么办呢?此时,就需要我们手动来开启,为此我们需要修改MySQL的my.conf文件,通常情况下,该文件位于/etc/my.cnf路径,在[mysqld]下写入如下内容:

  1. cd /var/etc/mysql/
  1. cat my.cnf
  1. # 设置Binlog存储目录
  2. log_bin=/var/lib/mysql/bin-log
  3. # 设置Binlog索引存储目录
  4. log_bin_index=/var/lib/mysql/mysql-bin.index
  5. # 删除7天前的Binlog
  6. expire_logs_days=30
  7. # 集群内MySQL服务器的ID
  8. server_id=0002
  9. # 设置Binlog日志模式
  10. binlog_format=ROW

建库建表

新建一个数据库名为es_test,创建一个cap.published的表,以下为执行脚本

  1. /*
  2. Navicat MySQL Data Transfer
  3. Source Server : docker.mysql8
  4. Source Server Type : MySQL
  5. Source Server Version : 80027
  6. Source Host : localhost:3306
  7. Source Schema : es_test
  8. Target Server Type : MySQL
  9. Target Server Version : 80027
  10. File Encoding : 65001
  11. Date: 05/10/2022 11:16:20
  12. */
  13. SET NAMES utf8mb4;
  14. SET FOREIGN_KEY_CHECKS = 0;
  15. -- ----------------------------
  16. -- Table structure for cap.published
  17. -- ----------------------------
  18. DROP TABLE IF EXISTS `cap.published`;
  19. CREATE TABLE `cap.published` (
  20. `Id` bigint NOT NULL,
  21. `Version` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  22. `Name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
  23. `Content` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL,
  24. `Retries` int NULL DEFAULT NULL,
  25. `Added` datetime NOT NULL,
  26. `ExpiresAt` datetime NULL DEFAULT NULL,
  27. `StatusName` varchar(40) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
  28. PRIMARY KEY (`Id`) USING BTREE,
  29. INDEX `IX_ExpiresAt`(`ExpiresAt`) USING BTREE
  30. ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = DYNAMIC;
  31. -- ----------------------------
  32. -- Records of cap.published
  33. -- ----------------------------
  34. INSERT INTO `cap.published` VALUES (2163263118141927425, 'v1', 'sample.rabbitmq.mysql', '{\"Headers\":{\"cap-callback-name\":null,\"cap-msg-id\":\"2163263118141927425\",\"cap-corr-id\":\"2163263118141927425\",\"cap-corr-seq\":\"0\",\"cap-msg-name\":\"sample.rabbitmq.mysql\",\"cap-msg-type\":\"DateTime\",\"cap-senttime\":\"2022-09-20 16:42:24 \\u002B08:00\"},\"Value\":\"2022-09-20T16:42:24.7124552+08:00\"}', 0, '2022-09-21 16:42:25', '2022-09-21 16:42:25', 'Succeeded');
  35. INSERT INTO `cap.published` VALUES (2163263124588572673, 'v1', 'sample.rabbitmq.mysql', '{\"Headers\":{\"cap-callback-name\":null,\"cap-msg-id\":\"2163263124588572673\",\"cap-corr-id\":\"2163263124588572673\",\"cap-corr-seq\":\"0\",\"cap-msg-name\":\"sample.rabbitmq.mysql\",\"cap-msg-type\":\"DateTime\",\"cap-senttime\":\"2022-09-20 17:08:38 \\u002B08:00\"},\"Value\":\"2022-09-20T17:08:38.5765601+08:00\"}', 0, '2022-09-21 17:08:39', '2022-09-21 17:08:39', 'Succeeded');
  36. INSERT INTO `cap.published` VALUES (2163263128052154369, 'v1', 'sample.rabbitmq.mysql', '{\"Headers\":{\"cap-callback-name\":null,\"cap-msg-id\":\"2163263128052154369\",\"cap-corr-id\":\"2163263128052154369\",\"cap-corr-seq\":\"0\",\"cap-msg-name\":\"sample.rabbitmq.mysql\",\"cap-msg-type\":\"DateTime\",\"cap-senttime\":\"2022-09-20 17:22:44 \\u002B08:00\"},\"Value\":\"2022-09-20T17:22:44.1822906+08:00\"}', 0, '2022-09-20 17:22:44', '2022-09-21 17:22:45', 'Succeeded');
  37. SET FOREIGN_KEY_CHECKS = 1;

1664939901277.png

  1. SET NAMES utf8mb4;
  2. SET FOREIGN_KEY_CHECKS = 0;
  3. -- ----------------------------
  4. -- Table structure for cap.published
  5. -- ----------------------------
  6. DROP TABLE IF EXISTS `cap.published`;
  7. CREATE TABLE `cap.published` (
  8. `Id` bigint NOT NULL,
  9. `Version` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  10. `Name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
  11. `Content` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL,
  12. `Retries` int NULL DEFAULT NULL,
  13. `Added` datetime NOT NULL,
  14. `ExpiresAt` datetime NULL DEFAULT NULL,
  15. `StatusName` varchar(40) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
  16. PRIMARY KEY (`Id`) USING BTREE,
  17. INDEX `IX_ExpiresAt`(`ExpiresAt`) USING BTREE
  18. ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = DYNAMIC;
  19. SET FOREIGN_KEY_CHECKS = 1;
  1. -- ----------------------------
  2. -- Records of cap.published
  3. -- ----------------------------
  4. INSERT INTO `cap.published` VALUES (1, 'v1', 'mysql5', '{\"Headers\":{\"cap-callback-name\":null,\"cap-msg-id\":\"2163263118141927425\",\"cap-corr-id\":\"2163263118141927425\",\"cap-corr-seq\":\"0\",\"cap-msg-name\":\"sample.rabbitmq.mysql\",\"cap-msg-type\":\"DateTime\",\"cap-senttime\":\"2022-09-20 16:42:24 \\u002B08:00\"},\"Value\":\"2022-09-20T16:42:24.7124552+08:00\"}', 200, '2022-09-21 16:42:25', '2022-09-21 16:42:25', 'Succeeded');
  5. INSERT INTO `cap.published` VALUES (2, 'v1', 'mysql6', '{\"Headers\":{\"cap-callback-name\":null,\"cap-msg-id\":\"2163263124588572673\",\"cap-corr-id\":\"2163263124588572673\",\"cap-corr-seq\":\"0\",\"cap-msg-name\":\"sample.rabbitmq.mysql\",\"cap-msg-type\":\"DateTime\",\"cap-senttime\":\"2022-09-20 17:08:38 \\u002B08:00\"},\"Value\":\"2022-09-20T17:08:38.5765601+08:00\"}', 230, '2022-09-21 17:08:39', '2022-09-21 17:08:39', 'Succeeded');
  6. INSERT INTO `cap.published` VALUES (3, 'v1', 'mysql7', '{\"Headers\":{\"cap-callback-name\":null,\"cap-msg-id\":\"2163263128052154369\",\"cap-corr-id\":\"2163263128052154369\",\"cap-corr-seq\":\"0\",\"cap-msg-name\":\"sample.rabbitmq.mysql\",\"cap-msg-type\":\"DateTime\",\"cap-senttime\":\"2022-09-20 17:22:44 \\u002B08:00\"},\"Value\":\"2022-09-20T17:22:44.1822906+08:00\"}', 115, '2022-09-20 17:22:44', '2022-09-21 17:22:45', 'Succeeded');
  7. INSERT INTO `cap.published` VALUES (2163263118141927425, 'v1', 'sample.rabbitmq.mysql1', '{\"Headers\":{\"cap-callback-name\":null,\"cap-msg-id\":\"2163263118141927425\",\"cap-corr-id\":\"2163263118141927425\",\"cap-corr-seq\":\"0\",\"cap-msg-name\":\"sample.rabbitmq.mysql\",\"cap-msg-type\":\"DateTime\",\"cap-senttime\":\"2022-09-20 16:42:24 \\u002B08:00\"},\"Value\":\"2022-09-20T16:42:24.7124552+08:00\"}', 100, '2022-09-21 16:42:25', '2022-09-21 16:42:25', 'Succeeded');
  8. INSERT INTO `cap.published` VALUES (2163263124588572673, 'v1', 'sample.rabbitmq.mysql2', '{\"Headers\":{\"cap-callback-name\":null,\"cap-msg-id\":\"2163263124588572673\",\"cap-corr-id\":\"2163263124588572673\",\"cap-corr-seq\":\"0\",\"cap-msg-name\":\"sample.rabbitmq.mysql\",\"cap-msg-type\":\"DateTime\",\"cap-senttime\":\"2022-09-20 17:08:38 \\u002B08:00\"},\"Value\":\"2022-09-20T17:08:38.5765601+08:00\"}', 200, '2022-09-21 17:08:39', '2022-09-21 17:08:39', 'Succeeded');
  9. INSERT INTO `cap.published` VALUES (2163263128052154369, 'v1', 'sample.rabbitmq.mysql3', '{\"Headers\":{\"cap-callback-name\":null,\"cap-msg-id\":\"2163263128052154369\",\"cap-corr-id\":\"2163263128052154369\",\"cap-corr-seq\":\"0\",\"cap-msg-name\":\"sample.rabbitmq.mysql\",\"cap-msg-type\":\"DateTime\",\"cap-senttime\":\"2022-09-20 17:22:44 \\u002B08:00\"},\"Value\":\"2022-09-20T17:22:44.1822906+08:00\"}', 400, '2022-09-20 17:22:44', '2022-09-21 17:22:45', 'Succeeded');

配置LogStash

配置LogStash核心配置文件,指定输入 input 的数据来源和 output 的数据输出服务,需要以下操作:

  1. SELECT version()

需要自己根据自己的Mysql数据库的版本下载mysql连接驱动程序,我的版本是mysql-connector-java-8.0.27.zip,将其下载解压后放到/usr/share/logstash/pipeline目录下
驱动版本下载: MySQL Community Downloads

mysql-es.conf

在logstash的config目录(/usr/share/logstash/config)下配置mysql-es-xc.conf文件,logstash会根据配置的地址从MySQL中读取数据向ES中写入索引。

  1. #进入容器
  2. docker exec -it logstash /bin/bash
  3. #进入config文件夹
  4. cd /usr/share/logstash/config
  5. vi mysql-es.conf
  1. input {
  2. jdbc {
  3. jdbc_connection_string => "jdbc:mysql://192.168.3.40:3306/es_test?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true"
  4. # the user we wish to excute our statement as
  5. jdbc_user => "root"
  6. jdbc_password => "123456"
  7. # the path to our downloaded jdbc driver
  8. jdbc_driver_library => "/usr/share/logstash/pipeline/mysql-connector-java-8.0.27.jar"
  9. # the name of the driver class for mysql
  10. jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
  11. jdbc_paging_enabled => "true"
  12. jdbc_page_size => "50000"
  13. jdbc_default_timezone =>"Asia/Shanghai"
  14. #record_last_run => "true"
  15. #use_column_value => "true"
  16. #tracking_column => "change_date"
  17. #如果列是时间字段(比如updateTime),一定要指定这个类型为timestamp
  18. #tracking_column_type => "timestamp"
  19. #statement => "SELECT Id,`Name` as CName,Content,Added FROM `cap.published` as cap_published"
  20. statement => "SELECT * FROM `cap.published` as cap_published"
  21. #定时配置 -- 每个一分钟会采集一次数据
  22. schedule => "* * * * *"
  23. }
  24. }
  25. output {
  26. elasticsearch {
  27. #ES的ip地址和端口
  28. #hosts => ["localhost:9200","localhost:9202","localhost:9203"]
  29. hosts => "192.168.3.40:9200"
  30. manage_template => false
  31. user => "longfc"
  32. password => "lfc123456"
  33. #ES索引库名称
  34. index => "cap_published"
  35. document_id => "%{id}"
  36. }
  37. stdout {
  38. #日志输出
  39. codec => json_lines
  40. }
  41. }

注意事项:

  • 数据库es_test及表cap.published提前创建好;
  • elasticsearch的user需要提前在kibana创建好,并配置好相应的角色及权限;
  • 索引可以不用预先创建,由ES自动创建(需要执行语句里预先设置好符合ES索引的表别名称),好像还是要自己先创建好索引才能同步数据成功;
  • 索引名称只能是小写;

    logstash.yml

    ```ruby

    进入容器

    docker exec -it logstash /bin/bash

进入config文件夹

cd /usr/share/logstash/config

vi logstash.yml

  1. ```ruby
  2. http.host: "0.0.0.0"
  3. # xpack.monitoring.elasticsearch.hosts: [ "http://192.168.3.40:9200" ]
  4. # xpack.monitoring.elasticsearch.username: "elastic"
  5. # xpack.monitoring.elasticsearch.password: "F3SXEhBp3Jysfd9OyhJX"
  6. xpack.monitoring.enabled: false

pipelines.yml

  1. #进入容器
  2. docker exec -it logstash /bin/bash
  3. #进入config文件夹
  4. cd /usr/share/logstash/config
  5. #更改pipelines.yml文件
  6. vi pipelines.yml
  1. # This file is where you define your pipelines. You can define multiple.
  2. # For more information on multiple pipelines, see the documentation:
  3. # https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html
  4. - pipeline.id: main
  5. path.config: "/usr/share/logstash/pipeline"
  6. path.config: "/usr/share/logstash/config/mysql-es-xc.conf"

重启容器

  1. #退出容器
  2. exit
  3. #重启容器
  4. docker restart logstash

测试

1664941200873.png

1664941219778.png