注意,在操作以下内容之前,请确认你的elasticsearch和logstash环境已经配置好再进行;
参考
https://www.yuque.com/wangyekun/hgflvh/xt1sts
https://blog.csdn.net/weixin_41753567/article/details/125497958
https://blog.51cto.com/ch3nnn/5483295
说明
Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的 “存储库” 中,可与各种部署集成。 它提供了大量插件,可帮助你解析,丰富,转换和缓冲来自各种来源的数据。 如果你的数据需要 Beats 中没有的其他处理,则需要将 Logstash 添加到部署中。
这个工具不止可以用来做 mysql 到 es 的数据同步,它的应用场景还有:日志搜索器( logstash 采集、处理、转发到 elasticsearch 存储,在 kibana 进行展示)、elk 日志分析(elasticsearch + logstash + kibana)等。
它既可以全量同步旧数据,也可以增量同步新数据,而且对 mysql 和 es 没有版本方面的限制,只需对应版本即可
开启binarylog
在使用Binlog前,首先需要确认mysql是否开启了Binlog,此时,我们可以使用下面的命令:
docker exec -it mysql /bin/bash
mysql -uroot -p123456
SHOW VARIABLES LIKE 'LOG_BIN';

如果Binlog没有开启怎么办呢?此时,就需要我们手动来开启,为此我们需要修改MySQL的my.conf文件,通常情况下,该文件位于/etc/my.cnf路径,在[mysqld]下写入如下内容:
cd /var/etc/mysql/
cat my.cnf
# 设置Binlog存储目录log_bin=/var/lib/mysql/bin-log# 设置Binlog索引存储目录log_bin_index=/var/lib/mysql/mysql-bin.index# 删除7天前的Binlogexpire_logs_days=30# 集群内MySQL服务器的IDserver_id=0002# 设置Binlog日志模式binlog_format=ROW
建库建表
新建一个数据库名为es_test,创建一个cap.published的表,以下为执行脚本
/*Navicat MySQL Data TransferSource Server : docker.mysql8Source Server Type : MySQLSource Server Version : 80027Source Host : localhost:3306Source Schema : es_testTarget Server Type : MySQLTarget Server Version : 80027File Encoding : 65001Date: 05/10/2022 11:16:20*/SET NAMES utf8mb4;SET FOREIGN_KEY_CHECKS = 0;-- ------------------------------ Table structure for cap.published-- ----------------------------DROP TABLE IF EXISTS `cap.published`;CREATE TABLE `cap.published` (`Id` bigint NOT NULL,`Version` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,`Name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,`Content` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL,`Retries` int NULL DEFAULT NULL,`Added` datetime NOT NULL,`ExpiresAt` datetime NULL DEFAULT NULL,`StatusName` varchar(40) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,PRIMARY KEY (`Id`) USING BTREE,INDEX `IX_ExpiresAt`(`ExpiresAt`) USING BTREE) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = DYNAMIC;-- ------------------------------ Records of cap.published-- ----------------------------INSERT INTO `cap.published` VALUES (2163263118141927425, 'v1', 'sample.rabbitmq.mysql', '{\"Headers\":{\"cap-callback-name\":null,\"cap-msg-id\":\"2163263118141927425\",\"cap-corr-id\":\"2163263118141927425\",\"cap-corr-seq\":\"0\",\"cap-msg-name\":\"sample.rabbitmq.mysql\",\"cap-msg-type\":\"DateTime\",\"cap-senttime\":\"2022-09-20 16:42:24 \\u002B08:00\"},\"Value\":\"2022-09-20T16:42:24.7124552+08:00\"}', 0, '2022-09-21 16:42:25', '2022-09-21 16:42:25', 'Succeeded');INSERT INTO `cap.published` VALUES (2163263124588572673, 'v1', 'sample.rabbitmq.mysql', '{\"Headers\":{\"cap-callback-name\":null,\"cap-msg-id\":\"2163263124588572673\",\"cap-corr-id\":\"2163263124588572673\",\"cap-corr-seq\":\"0\",\"cap-msg-name\":\"sample.rabbitmq.mysql\",\"cap-msg-type\":\"DateTime\",\"cap-senttime\":\"2022-09-20 17:08:38 \\u002B08:00\"},\"Value\":\"2022-09-20T17:08:38.5765601+08:00\"}', 0, '2022-09-21 17:08:39', '2022-09-21 17:08:39', 'Succeeded');INSERT INTO `cap.published` VALUES (2163263128052154369, 'v1', 'sample.rabbitmq.mysql', '{\"Headers\":{\"cap-callback-name\":null,\"cap-msg-id\":\"2163263128052154369\",\"cap-corr-id\":\"2163263128052154369\",\"cap-corr-seq\":\"0\",\"cap-msg-name\":\"sample.rabbitmq.mysql\",\"cap-msg-type\":\"DateTime\",\"cap-senttime\":\"2022-09-20 17:22:44 \\u002B08:00\"},\"Value\":\"2022-09-20T17:22:44.1822906+08:00\"}', 0, '2022-09-20 17:22:44', '2022-09-21 17:22:45', 'Succeeded');SET FOREIGN_KEY_CHECKS = 1;

SET NAMES utf8mb4;SET FOREIGN_KEY_CHECKS = 0;-- ------------------------------ Table structure for cap.published-- ----------------------------DROP TABLE IF EXISTS `cap.published`;CREATE TABLE `cap.published` (`Id` bigint NOT NULL,`Version` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,`Name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,`Content` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL,`Retries` int NULL DEFAULT NULL,`Added` datetime NOT NULL,`ExpiresAt` datetime NULL DEFAULT NULL,`StatusName` varchar(40) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,PRIMARY KEY (`Id`) USING BTREE,INDEX `IX_ExpiresAt`(`ExpiresAt`) USING BTREE) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = DYNAMIC;SET FOREIGN_KEY_CHECKS = 1;
-- ------------------------------ Records of cap.published-- ----------------------------INSERT INTO `cap.published` VALUES (1, 'v1', 'mysql5', '{\"Headers\":{\"cap-callback-name\":null,\"cap-msg-id\":\"2163263118141927425\",\"cap-corr-id\":\"2163263118141927425\",\"cap-corr-seq\":\"0\",\"cap-msg-name\":\"sample.rabbitmq.mysql\",\"cap-msg-type\":\"DateTime\",\"cap-senttime\":\"2022-09-20 16:42:24 \\u002B08:00\"},\"Value\":\"2022-09-20T16:42:24.7124552+08:00\"}', 200, '2022-09-21 16:42:25', '2022-09-21 16:42:25', 'Succeeded');INSERT INTO `cap.published` VALUES (2, 'v1', 'mysql6', '{\"Headers\":{\"cap-callback-name\":null,\"cap-msg-id\":\"2163263124588572673\",\"cap-corr-id\":\"2163263124588572673\",\"cap-corr-seq\":\"0\",\"cap-msg-name\":\"sample.rabbitmq.mysql\",\"cap-msg-type\":\"DateTime\",\"cap-senttime\":\"2022-09-20 17:08:38 \\u002B08:00\"},\"Value\":\"2022-09-20T17:08:38.5765601+08:00\"}', 230, '2022-09-21 17:08:39', '2022-09-21 17:08:39', 'Succeeded');INSERT INTO `cap.published` VALUES (3, 'v1', 'mysql7', '{\"Headers\":{\"cap-callback-name\":null,\"cap-msg-id\":\"2163263128052154369\",\"cap-corr-id\":\"2163263128052154369\",\"cap-corr-seq\":\"0\",\"cap-msg-name\":\"sample.rabbitmq.mysql\",\"cap-msg-type\":\"DateTime\",\"cap-senttime\":\"2022-09-20 17:22:44 \\u002B08:00\"},\"Value\":\"2022-09-20T17:22:44.1822906+08:00\"}', 115, '2022-09-20 17:22:44', '2022-09-21 17:22:45', 'Succeeded');INSERT INTO `cap.published` VALUES (2163263118141927425, 'v1', 'sample.rabbitmq.mysql1', '{\"Headers\":{\"cap-callback-name\":null,\"cap-msg-id\":\"2163263118141927425\",\"cap-corr-id\":\"2163263118141927425\",\"cap-corr-seq\":\"0\",\"cap-msg-name\":\"sample.rabbitmq.mysql\",\"cap-msg-type\":\"DateTime\",\"cap-senttime\":\"2022-09-20 16:42:24 \\u002B08:00\"},\"Value\":\"2022-09-20T16:42:24.7124552+08:00\"}', 100, '2022-09-21 16:42:25', '2022-09-21 16:42:25', 'Succeeded');INSERT INTO `cap.published` VALUES (2163263124588572673, 'v1', 'sample.rabbitmq.mysql2', '{\"Headers\":{\"cap-callback-name\":null,\"cap-msg-id\":\"2163263124588572673\",\"cap-corr-id\":\"2163263124588572673\",\"cap-corr-seq\":\"0\",\"cap-msg-name\":\"sample.rabbitmq.mysql\",\"cap-msg-type\":\"DateTime\",\"cap-senttime\":\"2022-09-20 17:08:38 \\u002B08:00\"},\"Value\":\"2022-09-20T17:08:38.5765601+08:00\"}', 200, '2022-09-21 17:08:39', '2022-09-21 17:08:39', 'Succeeded');INSERT INTO `cap.published` VALUES (2163263128052154369, 'v1', 'sample.rabbitmq.mysql3', '{\"Headers\":{\"cap-callback-name\":null,\"cap-msg-id\":\"2163263128052154369\",\"cap-corr-id\":\"2163263128052154369\",\"cap-corr-seq\":\"0\",\"cap-msg-name\":\"sample.rabbitmq.mysql\",\"cap-msg-type\":\"DateTime\",\"cap-senttime\":\"2022-09-20 17:22:44 \\u002B08:00\"},\"Value\":\"2022-09-20T17:22:44.1822906+08:00\"}', 400, '2022-09-20 17:22:44', '2022-09-21 17:22:45', 'Succeeded');
配置LogStash
配置LogStash核心配置文件,指定输入 input 的数据来源和 output 的数据输出服务,需要以下操作:
SELECT version()
需要自己根据自己的Mysql数据库的版本下载mysql连接驱动程序,我的版本是mysql-connector-java-8.0.27.zip,将其下载解压后放到/usr/share/logstash/pipeline目录下
驱动版本下载: MySQL Community Downloads
mysql-es.conf
在logstash的config目录(/usr/share/logstash/config)下配置mysql-es-xc.conf文件,logstash会根据配置的地址从MySQL中读取数据向ES中写入索引。
#进入容器docker exec -it logstash /bin/bash#进入config文件夹cd /usr/share/logstash/configvi mysql-es.conf
input {jdbc {jdbc_connection_string => "jdbc:mysql://192.168.3.40:3306/es_test?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true"# the user we wish to excute our statement asjdbc_user => "root"jdbc_password => "123456"# the path to our downloaded jdbc driverjdbc_driver_library => "/usr/share/logstash/pipeline/mysql-connector-java-8.0.27.jar"# the name of the driver class for mysqljdbc_driver_class => "com.mysql.cj.jdbc.Driver"jdbc_paging_enabled => "true"jdbc_page_size => "50000"jdbc_default_timezone =>"Asia/Shanghai"#record_last_run => "true"#use_column_value => "true"#tracking_column => "change_date"#如果列是时间字段(比如updateTime),一定要指定这个类型为timestamp#tracking_column_type => "timestamp"#statement => "SELECT Id,`Name` as CName,Content,Added FROM `cap.published` as cap_published"statement => "SELECT * FROM `cap.published` as cap_published"#定时配置 -- 每个一分钟会采集一次数据schedule => "* * * * *"}}output {elasticsearch {#ES的ip地址和端口#hosts => ["localhost:9200","localhost:9202","localhost:9203"]hosts => "192.168.3.40:9200"manage_template => falseuser => "longfc"password => "lfc123456"#ES索引库名称index => "cap_published"document_id => "%{id}"}stdout {#日志输出codec => json_lines}}
注意事项:
- 数据库es_test及表cap.published提前创建好;
- elasticsearch的user需要提前在kibana创建好,并配置好相应的角色及权限;
- 索引可以不用预先创建,由ES自动创建(需要执行语句里预先设置好符合ES索引的表别名称),好像还是要自己先创建好索引才能同步数据成功;
- 索引名称只能是小写;
logstash.yml
```ruby进入容器
docker exec -it logstash /bin/bash
进入config文件夹
cd /usr/share/logstash/config
vi logstash.yml
```rubyhttp.host: "0.0.0.0"# xpack.monitoring.elasticsearch.hosts: [ "http://192.168.3.40:9200" ]# xpack.monitoring.elasticsearch.username: "elastic"# xpack.monitoring.elasticsearch.password: "F3SXEhBp3Jysfd9OyhJX"xpack.monitoring.enabled: false
pipelines.yml
#进入容器docker exec -it logstash /bin/bash#进入config文件夹cd /usr/share/logstash/config#更改pipelines.yml文件vi pipelines.yml
# This file is where you define your pipelines. You can define multiple.# For more information on multiple pipelines, see the documentation:# https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html- pipeline.id: mainpath.config: "/usr/share/logstash/pipeline"path.config: "/usr/share/logstash/config/mysql-es-xc.conf"
重启容器
#退出容器exit#重启容器docker restart logstash
测试


