业务数据库需要做的修改
MySQL 业务数据
配置 MySQL 数据库
- 开启 binlog
# 开启 binlog
server-id = 223344
log_bin = mysql-bin
binlog_format = ROW # 这里一定是 row 格式
binlog_row_image = FULL
expire_logs_days = 10 # 日志保留时间
:::info
- server-id:对于 MySQL 集群中的每个服务器和复制客户端,server-id 的值必须是唯一的。 在 MySQL 连接器设置期间,Debezium 为连接器分配一个唯一的服务器 ID。
- log_bin:log_bin 的值是 binlog 文件序列的基本名称。
- binlog_format:binlog-format 必须设置为 ROW 或 row。
- binlog_row_image:binlog_row_image 必须设置为 FULL 或 full。
- expire_logs_days:这是自动删除 binlog 文件的天数。 默认值为 0,表示不自动删除。 根据自己的需要设置该值。
:::
创建用于同步数据的用户并授权
:::info 需要 SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT 权限。
- SELECT:使连接器能够从数据库中的表中选择行。 这仅在执行快照时使用。
- RELOAD:允许连接器使用 FLUSH 语句来清除或重新加载内部缓存、刷新表或获取锁。 这仅在执行快照时使用。
- SHOW DATABASES:通过发出 SHOW DATABASE 语句,使连接器能够查看数据库名称。 这仅在执行快照时使用。
- REPLICATION SLAVE:使连接器能够连接并读取 MySQL 服务器 binlog。
- REPLICATION CLIENT:允许连接器使用以下语句,连接器总是需要这个。
- 显示 MASTER 状态
- 显示 SLAVE 状态
- 显示二进制日志
:::
- 使用 root 用户登录 MySQL
mysql
- 创建 Debezium 用于同步数据的用户并授权
CREATE USER 'debezium'@'%' IDENTIFIED BY '123456';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium';
FLUSH PRIVILEGES;
Postgres 业务数据
配置 Postgres 数据库
- 修改 postgresql.conf
vim /etc/postgresql/12/main/postgresql.conf
# 更改wal日志方式为 logical
wal_level = logical # minimal, replica, or logical
# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s)
wal_sender_timeout = 180s # in milliseconds; 0 disable
# 指示服务器使用最多 10 个单独的进程来处理 WAL 更改,需要根据实际情况修改该值。
max_wal_senders = 10
# 指示服务器允许为流式 WAL 更改创建最多 10 个复制槽,需要根据实际情况修改该值。
max_replication_slots = 10
- 修改 pg_hba.conf
vim /etc/postgresql/12/main/pg_hba.conf
# TYPE DATABASE USER ADDRESS METHOD
############ REPLICATION ##############
local replication all trust
host replication all 127.0.0.1/32 trust
host replication all ::1/128 trust
:::info
- METHOD 这里指定的是
trust
,无条件地允许连接。这种方法允许任何可以与 PostgreSQL 数据库服务器连接的用户以他们期望的任意 PostgreSQL 数据库用户身份登入,而不需要口令或者其他任何认证。 - USER 这里指定的是
all
,如果已创建具有 REPLICATION 和 LOGIN 权限的其他用户,可以更改为其他用户。 - ADDRESS 这里指定的是
127.0.0.1/32
(IPv4 回环地址) 和::1/128
(IPv6 回环地址),可以更改为0.0.0.0/0
表示所有 IPv4 地址,::0/0
表示所有 IPv6 地址
:::
- 重启 postgresql 服务生效,所以一般是在业务低峰期更改
sudo systemctl daemon-reload
sudo systemctl restart postgresql
创建用于同步数据的用户并授权
:::info 需要 REPLICATION,LOGIN,SELECT,CREATE 权限。
要将表添加到 publication,用户必须是表的所有者,表所有者自动拥有表的 SELECT 权限,所以只需要 CREATE 权限。
REPLICATION:为了将表添加到 publication,所以需要数据库上的 REPLICATION 权限。
CREATE:为了添加添加发布,所以需要数据库上的 CREATE 权限。
SELECT:为了复制初始表数据,所以需要表上的 SELECT 权限, 表所有者自动拥有表的 SELECT 权限。
:::
- 使用 postgres 用户登录 PostgreSQL
sudo -u postgres psql
- 方式一:为表的所有者增加 REPLICATION,LOGIN 和 CREATE 权限
-- 创建 debezium 用户并赋予 REPLICATION LOGIN 权限
ALTER ROLE postgres WITH REPLICATION LOGIN;
-- 把 mydb 数据库创建新的模式和发布的权限赋给 debezium
GRANT CREATE ON DATABASE mydb TO postgres;
- 方式二:由于源表已经存在,使用复制组来与原始所有者共享所有权
:::info 但是由于源表已经存在,需要一种机制来与原始所有者共享所有权。 要启用共享所有权,你需要创建一个 PostgreSQL 复制组,然后将现有表所有者和复制用户添加到该组。
:::
-- 创建 debezium 用户并赋予 REPLICATION LOGIN 权限
CREATE USER debezium WITH PASSWORD '123456' REPLICATION LOGIN;
-- 创建一个 replication_group 用于共享所有权
CREATE ROLE replication_group;
-- 将现有表所有者和复制用户添加到该组
GRANT replication_group TO postgres;
GRANT replication_group TO debezium;
-- 将表的所有权转交给 replication_group,这样 debezium 和 postgres都用表的所有权
ALTER TABLE public.shipments OWNER TO replication_group;
-- 把 mydb 数据库创建新的模式和发布的权限赋给 debezium
GRANT CREATE ON DATABASE mydb TO debezium;
同步业务数据到 Hudi(ODS)
下载 pitrix-data-warehouse
git clone git@git.internal.yunify.com:geekspeng/pitrix-data-warehouse.git
cd pitrix-data-warehouse
写入 Hudi 时同步到 Hive
- 打开根目录下的 warehouse.yaml,配置 Hive
hive_sync:
metastore_uris: 'thrift://localhost:9083'
username: 'hive' # HMS 用户名
password: 'hive' # HMS 密码
db: 'warehouse' # Hive 的数据库
配置需要同步的 Postgres 业务数据库
- 打开根目录下的 warehouse.yaml,配置需要同步的 Postgres 业务数据库(可以配置多个数据库地址)
pg_sync:
- hostname: '172.31.1.247' # 数据库地址
port: '5432' # 数据库端口
username: 'yunify' # 数据库用户名
password: '123456' # 数据库密码
databases: # 需要同步的数据库列表
- db: 'zone' # 数据库名称(可以配置多个数据库)
schema: 'public' # schema 名称
table:
- name: 'instance'
- name: 'image'
zone_id: 'pek3a' # 同步数据时,添加 zone_id 字段,并且值为 'pek3a'
- name: 'eip'
zone_id: 'pek3a'
- name: 'dns_alias'
zone_id: 'pek3a'
zone: 'pek3a' # 用于指定不同 zone 的 SQL 存放目录
- db: 'global' # 数据库名称(可以配置多个数据库)
schema: 'public' # schema 名称
table:
- name: 'wan_access'
zone_id: 'global'
- name: 'wan_cpe_order'
zone_id: 'global' # 同步数据时,添加 zone_id 字段,并且值为 'pek3a'
- name: 'wan_physical_connect'
zone_id: 'global'
- name: 'app_saas_instance'
zone_id: 'global'
- name: 'gatewayapi'
zone_id: 'global'
- db: 'billing_resource'
schema: 'public'
table:
- name: 'leasing'
- name: 'leasing_contract'
- name: 'leased'
- name: 'leased_contract'
- name: 'reserved_contract'
- hostname: '172.31.1.244' # 数据库地址
port: '5432' # 数据库端口
username: 'yunify' # 数据库用户名
password: '123456' # 数据库密码
databases: # 需要同步的数据库列表
- db: 'zone' # 数据库名称
schema: 'public' # schema 名称(可以配置多个数据库)
table:
- name: 'instance'
- name: 'nfv'
- name: 'image'
zone_id: 'pek3' # 同步数据时,添加 zone_id 字段,并且值为 'pek3a'
zone: 'pek3' # 用于指定不同 zone 的 SQL 存放目录
生成数据同步 SQL并提交(自动查询业务数据库的 Schema 并生成SQL)
:::info
注意⚠️: 需要 Python 3.6 以上版本:::
- 切换路径
cd pitrix-data-warehouse
- 安装 pip 包
pip install -r requirements.txt
:::color4 安装 pip install psycopg2 报错如下
pg_config executable not found
解决方法:
sudo apt-get install libpq-dev
参考文档
pg_config executable not found
:::
- 生成数据同步 SQL(每个数据库的每个表一个SQL 文件)
python3 ods/ods_sync.py
生成的 SQL 保存在 ods 对应的 db_name 目录下,如果数据库为 zone,会根据配置的 zone 创建一个子目录,SQL 文件名称为 sync_table_name.sql
- 通过 submit_job.py 提交 job
/submit_job.py --help
usage: submit_job.py [-h] [-f FILE] [-d DIRECTORY] [-i INIT] [-j JAR] [-l LIBRARY]
optional arguments:
-h, --help show this help message and exit
-f FILE, --file FILE Script file that should be executed.
-d DIRECTORY, --directory DIRECTORY
Script file that should be executed.
-i INIT, --init INIT Script file that used to init the session context
-j JAR, --jar JAR A JAR file to be imported into the session
-l LIBRARY, --library LIBRARY
A JAR file directory with which every new session is initialized
:::info
- -f:提交单个 SQL
- -d:提交目录下的所有 SQL
- -i:用于初始化的 SQL,初始化 SQL 文件中允许使用以下语句:
- DDL(CREATE/DROP/ALTER)
- USE CATALOG/DATABASE
- LOAD/UNLOAD MODULE
- SET command
- RESET command
common-init.sql 用来初始化公共参数,默认使用根目录下的 common-init.sql,如果 SQL 同目录下有 common-init.sql,则使用该文件
如果未指定该参数, 同时 SQL 同目录下有同名的后缀为 -init.sql 的文件,则会使用该文件进行初始化
- -j:指定依赖文件
- -l:指定依赖的目录
:::
- 提交单个 SQL
./submit_job.py -f ods/global/sync_wan_access.sql
- 提交目录下的所有 SQL
./submit_job.py -d ods/global
- 通过 Flink SQL Client 提交(一次只能提交一个 SQL,并且需要自己指定用于初始化的 SQL)
/usr/local/flink/bin/sql-client.sh -i common-init.sql -f ods/global/sync_wan_access.sql
生成资源维度表(DIM)
配置资源维度需要抽象的表
dim_resource:
databases:
- db: 'zone'
table:
- name: 'instance'
- name: 'cluster'
resource_name_field: 'name' # resource_name 映射的字段
resource_type_field: 'app_id' # resource_type 映射的字段
- name: 's2_server'
resource_type_field: 'service_type' # resource_type 映射的字段
- name: 'vpc_border'
resource_name_field: 'border_name' # resource_name 映射的字段
- name: 'routing_table'
alias: 'rtable' # 如果配置了表的别名,那么 resource_id,resource_name 根据别名映射的字段
- name: 'security_group'
alias: 'group'
- name: 'waf_rule_group'
alias: 'rule_group'
- name: 'key_pair'
- name: 'nfv'
resource_type_field: 'nfv_type'
router_id_field: 'vpc_router_id' # router_id 映射的字段
- db: 'global'
table:
- name: 'wan_access'
resource_type_field: 'access_type' # resource_type 映射的字段
resource_type_prefix: 'sdwan_' # resource_type 的前缀
- name: 'wan_cpe_order'
resource_type_value: 'sdwan_cpe_order' # resource_type 的值
- name: 'wan_physical_connect'
resource_type_value: 'sdwan_physical_connect' # resource_type 的值
- name: 'app_saas_instance'
alias: 'ins'
resource_type_field: 'app_id' # resource_type 映射的字段
user_id_field: 'user_id' # user_id 映射的字段
- name: 'gatewayapi'
alias: 'api'
user_id_field: 'user_id'
生成数据同步 SQL并提交
:::info
注意⚠️: 需要 Python 3.6 以上版本:::
- 切换路径
cd pitrix-data-warehouse
- 生成数据同步 SQL(每个数据库的每个表一个SQL 文件)
python3 dim/dim_resource/dim_resource.py
生成的 SQL 保存在 dim 对应的 dim_resource 目录下,SQL 文件名称为 table_name.sql
- 通过 submit_job.py 提交 job
/submit_job.py --help
usage: submit_job.py [-h] [-f FILE] [-d DIRECTORY] [-i INIT] [-j JAR] [-l LIBRARY]
optional arguments:
-h, --help show this help message and exit
-f FILE, --file FILE Script file that should be executed.
-d DIRECTORY, --directory DIRECTORY
Script file that should be executed.
-i INIT, --init INIT Script file that used to init the session context
-j JAR, --jar JAR A JAR file to be imported into the session
-l LIBRARY, --library LIBRARY
A JAR file directory with which every new session is initialized
:::info
- -f:提交单个 SQL
- -d:提交目录下的所有 SQL
- -i:用于初始化的 SQL,初始化 SQL 文件中允许使用以下语句:
- DDL(CREATE/DROP/ALTER)
- USE CATALOG/DATABASE
- LOAD/UNLOAD MODULE
- SET command
- RESET command
common-init.sql 用来初始化公共参数,默认使用根目录下的 common-init.sql,如果 SQL 同目录下有 common-init.sql,则使用该文件
如果未指定该参数, 同时 SQL 同目录下有同名的后缀为 -init.sql 的文件,则会使用该文件进行初始化
- -j:指定依赖文件
- -l:指定依赖的目录
:::
- 提交单个 SQL
./submit_job.py -f dim/dim_resource/instance.sql
- 提交目录下的所有 SQL
./submit_job.py -d dim/dim_resource
- 通过 Flink SQL Client 提交(一次只能提交一个 SQL,并且需要自己指定用于初始化的 SQL)
/usr/local/flink/bin/sql-client.sh -i common-init.sql -f dim/dim_resource/instance.sql