注意,在操作以下内容之前,请确认你的elasticsearch和logstash环境已经配置好再进行;
参考
https://www.yuque.com/wangyekun/hgflvh/xt1sts
https://blog.csdn.net/weixin_41753567/article/details/125497958
https://blog.51cto.com/ch3nnn/5483295
说明
Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的 “存储库” 中,可与各种部署集成。 它提供了大量插件,可帮助你解析,丰富,转换和缓冲来自各种来源的数据。 如果你的数据需要 Beats 中没有的其他处理,则需要将 Logstash 添加到部署中。
这个工具不止可以用来做 mysql 到 es 的数据同步,它的应用场景还有:日志搜索器( logstash 采集、处理、转发到 elasticsearch 存储,在 kibana 进行展示)、elk 日志分析(elasticsearch + logstash + kibana)等。
它既可以全量同步旧数据,也可以增量同步新数据,而且对 mysql 和 es 没有版本方面的限制,只需对应版本即可
开启binarylog
在使用Binlog前,首先需要确认mysql是否开启了Binlog,此时,我们可以使用下面的命令:
docker exec -it mysql /bin/bash
mysql -uroot -p123456
SHOW VARIABLES LIKE 'LOG_BIN';
如果Binlog没有开启怎么办呢?此时,就需要我们手动来开启,为此我们需要修改MySQL的my.conf文件,通常情况下,该文件位于/etc/my.cnf路径,在[mysqld]下写入如下内容:
cd /var/etc/mysql/
cat my.cnf
# 设置Binlog存储目录
log_bin=/var/lib/mysql/bin-log
# 设置Binlog索引存储目录
log_bin_index=/var/lib/mysql/mysql-bin.index
# 删除7天前的Binlog
expire_logs_days=30
# 集群内MySQL服务器的ID
server_id=0002
# 设置Binlog日志模式
binlog_format=ROW
建库建表
新建一个数据库名为es_test,创建一个cap.published的表,以下为执行脚本
/*
Navicat MySQL Data Transfer
Source Server : docker.mysql8
Source Server Type : MySQL
Source Server Version : 80027
Source Host : localhost:3306
Source Schema : es_test
Target Server Type : MySQL
Target Server Version : 80027
File Encoding : 65001
Date: 05/10/2022 11:16:20
*/
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for cap.published
-- ----------------------------
DROP TABLE IF EXISTS `cap.published`;
CREATE TABLE `cap.published` (
`Id` bigint NOT NULL,
`Version` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
`Name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
`Content` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL,
`Retries` int NULL DEFAULT NULL,
`Added` datetime NOT NULL,
`ExpiresAt` datetime NULL DEFAULT NULL,
`StatusName` varchar(40) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
PRIMARY KEY (`Id`) USING BTREE,
INDEX `IX_ExpiresAt`(`ExpiresAt`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = DYNAMIC;
-- ----------------------------
-- Records of cap.published
-- ----------------------------
INSERT INTO `cap.published` VALUES (2163263118141927425, 'v1', 'sample.rabbitmq.mysql', '{\"Headers\":{\"cap-callback-name\":null,\"cap-msg-id\":\"2163263118141927425\",\"cap-corr-id\":\"2163263118141927425\",\"cap-corr-seq\":\"0\",\"cap-msg-name\":\"sample.rabbitmq.mysql\",\"cap-msg-type\":\"DateTime\",\"cap-senttime\":\"2022-09-20 16:42:24 \\u002B08:00\"},\"Value\":\"2022-09-20T16:42:24.7124552+08:00\"}', 0, '2022-09-21 16:42:25', '2022-09-21 16:42:25', 'Succeeded');
INSERT INTO `cap.published` VALUES (2163263124588572673, 'v1', 'sample.rabbitmq.mysql', '{\"Headers\":{\"cap-callback-name\":null,\"cap-msg-id\":\"2163263124588572673\",\"cap-corr-id\":\"2163263124588572673\",\"cap-corr-seq\":\"0\",\"cap-msg-name\":\"sample.rabbitmq.mysql\",\"cap-msg-type\":\"DateTime\",\"cap-senttime\":\"2022-09-20 17:08:38 \\u002B08:00\"},\"Value\":\"2022-09-20T17:08:38.5765601+08:00\"}', 0, '2022-09-21 17:08:39', '2022-09-21 17:08:39', 'Succeeded');
INSERT INTO `cap.published` VALUES (2163263128052154369, 'v1', 'sample.rabbitmq.mysql', '{\"Headers\":{\"cap-callback-name\":null,\"cap-msg-id\":\"2163263128052154369\",\"cap-corr-id\":\"2163263128052154369\",\"cap-corr-seq\":\"0\",\"cap-msg-name\":\"sample.rabbitmq.mysql\",\"cap-msg-type\":\"DateTime\",\"cap-senttime\":\"2022-09-20 17:22:44 \\u002B08:00\"},\"Value\":\"2022-09-20T17:22:44.1822906+08:00\"}', 0, '2022-09-20 17:22:44', '2022-09-21 17:22:45', 'Succeeded');
SET FOREIGN_KEY_CHECKS = 1;
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for cap.published
-- ----------------------------
DROP TABLE IF EXISTS `cap.published`;
CREATE TABLE `cap.published` (
`Id` bigint NOT NULL,
`Version` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
`Name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
`Content` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL,
`Retries` int NULL DEFAULT NULL,
`Added` datetime NOT NULL,
`ExpiresAt` datetime NULL DEFAULT NULL,
`StatusName` varchar(40) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
PRIMARY KEY (`Id`) USING BTREE,
INDEX `IX_ExpiresAt`(`ExpiresAt`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = DYNAMIC;
SET FOREIGN_KEY_CHECKS = 1;
-- ----------------------------
-- Records of cap.published
-- ----------------------------
INSERT INTO `cap.published` VALUES (1, 'v1', 'mysql5', '{\"Headers\":{\"cap-callback-name\":null,\"cap-msg-id\":\"2163263118141927425\",\"cap-corr-id\":\"2163263118141927425\",\"cap-corr-seq\":\"0\",\"cap-msg-name\":\"sample.rabbitmq.mysql\",\"cap-msg-type\":\"DateTime\",\"cap-senttime\":\"2022-09-20 16:42:24 \\u002B08:00\"},\"Value\":\"2022-09-20T16:42:24.7124552+08:00\"}', 200, '2022-09-21 16:42:25', '2022-09-21 16:42:25', 'Succeeded');
INSERT INTO `cap.published` VALUES (2, 'v1', 'mysql6', '{\"Headers\":{\"cap-callback-name\":null,\"cap-msg-id\":\"2163263124588572673\",\"cap-corr-id\":\"2163263124588572673\",\"cap-corr-seq\":\"0\",\"cap-msg-name\":\"sample.rabbitmq.mysql\",\"cap-msg-type\":\"DateTime\",\"cap-senttime\":\"2022-09-20 17:08:38 \\u002B08:00\"},\"Value\":\"2022-09-20T17:08:38.5765601+08:00\"}', 230, '2022-09-21 17:08:39', '2022-09-21 17:08:39', 'Succeeded');
INSERT INTO `cap.published` VALUES (3, 'v1', 'mysql7', '{\"Headers\":{\"cap-callback-name\":null,\"cap-msg-id\":\"2163263128052154369\",\"cap-corr-id\":\"2163263128052154369\",\"cap-corr-seq\":\"0\",\"cap-msg-name\":\"sample.rabbitmq.mysql\",\"cap-msg-type\":\"DateTime\",\"cap-senttime\":\"2022-09-20 17:22:44 \\u002B08:00\"},\"Value\":\"2022-09-20T17:22:44.1822906+08:00\"}', 115, '2022-09-20 17:22:44', '2022-09-21 17:22:45', 'Succeeded');
INSERT INTO `cap.published` VALUES (2163263118141927425, 'v1', 'sample.rabbitmq.mysql1', '{\"Headers\":{\"cap-callback-name\":null,\"cap-msg-id\":\"2163263118141927425\",\"cap-corr-id\":\"2163263118141927425\",\"cap-corr-seq\":\"0\",\"cap-msg-name\":\"sample.rabbitmq.mysql\",\"cap-msg-type\":\"DateTime\",\"cap-senttime\":\"2022-09-20 16:42:24 \\u002B08:00\"},\"Value\":\"2022-09-20T16:42:24.7124552+08:00\"}', 100, '2022-09-21 16:42:25', '2022-09-21 16:42:25', 'Succeeded');
INSERT INTO `cap.published` VALUES (2163263124588572673, 'v1', 'sample.rabbitmq.mysql2', '{\"Headers\":{\"cap-callback-name\":null,\"cap-msg-id\":\"2163263124588572673\",\"cap-corr-id\":\"2163263124588572673\",\"cap-corr-seq\":\"0\",\"cap-msg-name\":\"sample.rabbitmq.mysql\",\"cap-msg-type\":\"DateTime\",\"cap-senttime\":\"2022-09-20 17:08:38 \\u002B08:00\"},\"Value\":\"2022-09-20T17:08:38.5765601+08:00\"}', 200, '2022-09-21 17:08:39', '2022-09-21 17:08:39', 'Succeeded');
INSERT INTO `cap.published` VALUES (2163263128052154369, 'v1', 'sample.rabbitmq.mysql3', '{\"Headers\":{\"cap-callback-name\":null,\"cap-msg-id\":\"2163263128052154369\",\"cap-corr-id\":\"2163263128052154369\",\"cap-corr-seq\":\"0\",\"cap-msg-name\":\"sample.rabbitmq.mysql\",\"cap-msg-type\":\"DateTime\",\"cap-senttime\":\"2022-09-20 17:22:44 \\u002B08:00\"},\"Value\":\"2022-09-20T17:22:44.1822906+08:00\"}', 400, '2022-09-20 17:22:44', '2022-09-21 17:22:45', 'Succeeded');
配置LogStash
配置LogStash核心配置文件,指定输入 input 的数据来源和 output 的数据输出服务,需要以下操作:
SELECT version()
需要自己根据自己的Mysql数据库的版本下载mysql连接驱动程序,我的版本是mysql-connector-java-8.0.27.zip,将其下载解压后放到/usr/share/logstash/pipeline目录下
驱动版本下载: MySQL Community Downloads
mysql-es.conf
在logstash的config目录(/usr/share/logstash/config)下配置mysql-es-xc.conf文件,logstash会根据配置的地址从MySQL中读取数据向ES中写入索引。
#进入容器
docker exec -it logstash /bin/bash
#进入config文件夹
cd /usr/share/logstash/config
vi mysql-es.conf
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.3.40:3306/es_test?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true"
# the user we wish to excute our statement as
jdbc_user => "root"
jdbc_password => "123456"
# the path to our downloaded jdbc driver
jdbc_driver_library => "/usr/share/logstash/pipeline/mysql-connector-java-8.0.27.jar"
# the name of the driver class for mysql
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
jdbc_default_timezone =>"Asia/Shanghai"
#record_last_run => "true"
#use_column_value => "true"
#tracking_column => "change_date"
#如果列是时间字段(比如updateTime),一定要指定这个类型为timestamp
#tracking_column_type => "timestamp"
#statement => "SELECT Id,`Name` as CName,Content,Added FROM `cap.published` as cap_published"
statement => "SELECT * FROM `cap.published` as cap_published"
#定时配置 -- 每个一分钟会采集一次数据
schedule => "* * * * *"
}
}
output {
elasticsearch {
#ES的ip地址和端口
#hosts => ["localhost:9200","localhost:9202","localhost:9203"]
hosts => "192.168.3.40:9200"
manage_template => false
user => "longfc"
password => "lfc123456"
#ES索引库名称
index => "cap_published"
document_id => "%{id}"
}
stdout {
#日志输出
codec => json_lines
}
}
注意事项:
- 数据库es_test及表cap.published提前创建好;
- elasticsearch的user需要提前在kibana创建好,并配置好相应的角色及权限;
- 索引可以不用预先创建,由ES自动创建(需要执行语句里预先设置好符合ES索引的表别名称),好像还是要自己先创建好索引才能同步数据成功;
- 索引名称只能是小写;
logstash.yml
```ruby进入容器
docker exec -it logstash /bin/bash
进入config文件夹
cd /usr/share/logstash/config
vi logstash.yml
```ruby
http.host: "0.0.0.0"
# xpack.monitoring.elasticsearch.hosts: [ "http://192.168.3.40:9200" ]
# xpack.monitoring.elasticsearch.username: "elastic"
# xpack.monitoring.elasticsearch.password: "F3SXEhBp3Jysfd9OyhJX"
xpack.monitoring.enabled: false
pipelines.yml
#进入容器
docker exec -it logstash /bin/bash
#进入config文件夹
cd /usr/share/logstash/config
#更改pipelines.yml文件
vi pipelines.yml
# This file is where you define your pipelines. You can define multiple.
# For more information on multiple pipelines, see the documentation:
# https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html
- pipeline.id: main
path.config: "/usr/share/logstash/pipeline"
path.config: "/usr/share/logstash/config/mysql-es-xc.conf"
重启容器
#退出容器
exit
#重启容器
docker restart logstash