业务数据库需要做的修改

MySQL 业务数据

配置 MySQL 数据库

  • 开启 binlog
  1. # 开启 binlog
  2. server-id = 223344
  3. log_bin = mysql-bin
  4. binlog_format = ROW # 这里一定是 row 格式
  5. binlog_row_image = FULL
  6. expire_logs_days = 10 # 日志保留时间

:::info

  • server-id:对于 MySQL 集群中的每个服务器和复制客户端,server-id 的值必须是唯一的。 在 MySQL 连接器设置期间,Debezium 为连接器分配一个唯一的服务器 ID。
  • log_bin:log_bin 的值是 binlog 文件序列的基本名称。
  • binlog_format:binlog-format 必须设置为 ROW 或 row。
  • binlog_row_image:binlog_row_image 必须设置为 FULL 或 full。
  • expire_logs_days:这是自动删除 binlog 文件的天数。 默认值为 0,表示不自动删除。 根据自己的需要设置该值。

:::

创建用于同步数据的用户并授权

:::info 需要 SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT 权限。

  • SELECT:使连接器能够从数据库中的表中选择行。 这仅在执行快照时使用。
  • RELOAD:允许连接器使用 FLUSH 语句来清除或重新加载内部缓存、刷新表或获取锁。 这仅在执行快照时使用。
  • SHOW DATABASES:通过发出 SHOW DATABASE 语句,使连接器能够查看数据库名称。 这仅在执行快照时使用。
  • REPLICATION SLAVE:使连接器能够连接并读取 MySQL 服务器 binlog。
  • REPLICATION CLIENT:允许连接器使用以下语句,连接器总是需要这个。
    • 显示 MASTER 状态
    • 显示 SLAVE 状态
    • 显示二进制日志

:::

  • 使用 root 用户登录 MySQL
  1. mysql
  • 创建 Debezium 用于同步数据的用户并授权
  1. CREATE USER 'debezium'@'%' IDENTIFIED BY '123456';
  2. GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium';
  3. FLUSH PRIVILEGES;

Postgres 业务数据

配置 Postgres 数据库

  • 修改 postgresql.conf
  1. vim /etc/postgresql/12/main/postgresql.conf
  2. # 更改wal日志方式为 logical
  3. wal_level = logical # minimal, replica, or logical
  4. # 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s
  5. wal_sender_timeout = 180s # in milliseconds; 0 disable
  6. # 指示服务器使用最多 10 个单独的进程来处理 WAL 更改,需要根据实际情况修改该值。
  7. max_wal_senders = 10
  8. # 指示服务器允许为流式 WAL 更改创建最多 10 个复制槽,需要根据实际情况修改该值。
  9. max_replication_slots = 10
  • 修改 pg_hba.conf
  1. vim /etc/postgresql/12/main/pg_hba.conf
  2. # TYPE DATABASE USER ADDRESS METHOD
  3. ############ REPLICATION ##############
  4. local replication all trust
  5. host replication all 127.0.0.1/32 trust
  6. host replication all ::1/128 trust

:::info

  • METHOD 这里指定的是 trust,无条件地允许连接。这种方法允许任何可以与 PostgreSQL 数据库服务器连接的用户以他们期望的任意 PostgreSQL 数据库用户身份登入,而不需要口令或者其他任何认证。
  • USER 这里指定的是 all,如果已创建具有 REPLICATION 和 LOGIN 权限的其他用户,可以更改为其他用户。
  • ADDRESS 这里指定的是 127.0.0.1/32(IPv4 回环地址) 和 ::1/128(IPv6 回环地址),可以更改为 0.0.0.0/0 表示所有 IPv4 地址,::0/0表示所有 IPv6 地址

:::

  • 重启 postgresql 服务生效,所以一般是在业务低峰期更改
  1. sudo systemctl daemon-reload
  2. sudo systemctl restart postgresql

创建用于同步数据的用户并授权

:::info 需要 REPLICATION,LOGIN,SELECT,CREATE 权限。

要将表添加到 publication,用户必须是表的所有者,表所有者自动拥有表的 SELECT 权限,所以只需要 CREATE 权限。

REPLICATION:为了将表添加到 publication,所以需要数据库上的 REPLICATION 权限。

CREATE:为了添加添加发布,所以需要数据库上的 CREATE 权限。

SELECT:为了复制初始表数据,所以需要表上的 SELECT 权限, 表所有者自动拥有表的 SELECT 权限。

:::

  • 使用 postgres 用户登录 PostgreSQL
  1. sudo -u postgres psql
  • 方式一:为表的所有者增加 REPLICATION,LOGIN 和 CREATE 权限
  1. -- 创建 debezium 用户并赋予 REPLICATION LOGIN 权限
  2. ALTER ROLE postgres WITH REPLICATION LOGIN;
  3. -- mydb 数据库创建新的模式和发布的权限赋给 debezium
  4. GRANT CREATE ON DATABASE mydb TO postgres;
  • 方式二:由于源表已经存在,使用复制组来与原始所有者共享所有权

:::info 但是由于源表已经存在,需要一种机制来与原始所有者共享所有权。 要启用共享所有权,你需要创建一个 PostgreSQL 复制组,然后将现有表所有者和复制用户添加到该组。

:::

  1. -- 创建 debezium 用户并赋予 REPLICATION LOGIN 权限
  2. CREATE USER debezium WITH PASSWORD '123456' REPLICATION LOGIN;
  3. -- 创建一个 replication_group 用于共享所有权
  4. CREATE ROLE replication_group;
  5. -- 将现有表所有者和复制用户添加到该组
  6. GRANT replication_group TO postgres;
  7. GRANT replication_group TO debezium;
  8. -- 将表的所有权转交给 replication_group,这样 debezium postgres都用表的所有权
  9. ALTER TABLE public.shipments OWNER TO replication_group;
  10. -- mydb 数据库创建新的模式和发布的权限赋给 debezium
  11. GRANT CREATE ON DATABASE mydb TO debezium;

同步业务数据到 Hudi(ODS)

下载 pitrix-data-warehouse

  1. git clone git@git.internal.yunify.com:geekspeng/pitrix-data-warehouse.git
  2. cd pitrix-data-warehouse

写入 Hudi 时同步到 Hive

  • 打开根目录下的 warehouse.yaml,配置 Hive
  1. hive_sync:
  2. metastore_uris: 'thrift://localhost:9083'
  3. username: 'hive' # HMS 用户名
  4. password: 'hive' # HMS 密码
  5. db: 'warehouse' # Hive 的数据库

配置需要同步的 Postgres 业务数据库

  • 打开根目录下的 warehouse.yaml,配置需要同步的 Postgres 业务数据库(可以配置多个数据库地址)
  1. pg_sync:
  2. - hostname: '172.31.1.247' # 数据库地址
  3. port: '5432' # 数据库端口
  4. username: 'yunify' # 数据库用户名
  5. password: '123456' # 数据库密码
  6. databases: # 需要同步的数据库列表
  7. - db: 'zone' # 数据库名称(可以配置多个数据库)
  8. schema: 'public' # schema 名称
  9. table:
  10. - name: 'instance'
  11. - name: 'image'
  12. zone_id: 'pek3a' # 同步数据时,添加 zone_id 字段,并且值为 'pek3a'
  13. - name: 'eip'
  14. zone_id: 'pek3a'
  15. - name: 'dns_alias'
  16. zone_id: 'pek3a'
  17. zone: 'pek3a' # 用于指定不同 zone 的 SQL 存放目录
  18. - db: 'global' # 数据库名称(可以配置多个数据库)
  19. schema: 'public' # schema 名称
  20. table:
  21. - name: 'wan_access'
  22. zone_id: 'global'
  23. - name: 'wan_cpe_order'
  24. zone_id: 'global' # 同步数据时,添加 zone_id 字段,并且值为 'pek3a'
  25. - name: 'wan_physical_connect'
  26. zone_id: 'global'
  27. - name: 'app_saas_instance'
  28. zone_id: 'global'
  29. - name: 'gatewayapi'
  30. zone_id: 'global'
  31. - db: 'billing_resource'
  32. schema: 'public'
  33. table:
  34. - name: 'leasing'
  35. - name: 'leasing_contract'
  36. - name: 'leased'
  37. - name: 'leased_contract'
  38. - name: 'reserved_contract'
  39. - hostname: '172.31.1.244' # 数据库地址
  40. port: '5432' # 数据库端口
  41. username: 'yunify' # 数据库用户名
  42. password: '123456' # 数据库密码
  43. databases: # 需要同步的数据库列表
  44. - db: 'zone' # 数据库名称
  45. schema: 'public' # schema 名称(可以配置多个数据库)
  46. table:
  47. - name: 'instance'
  48. - name: 'nfv'
  49. - name: 'image'
  50. zone_id: 'pek3' # 同步数据时,添加 zone_id 字段,并且值为 'pek3a'
  51. zone: 'pek3' # 用于指定不同 zone 的 SQL 存放目录

生成数据同步 SQL并提交(自动查询业务数据库的 Schema 并生成SQL)

:::info

注意⚠️ 需要 Python 3.6 以上版本

:::

  • 切换路径
  1. cd pitrix-data-warehouse
  • 安装 pip 包
  1. pip install -r requirements.txt

:::color4 安装 pip install psycopg2 报错如下

pg_config executable not found

解决方法:

sudo apt-get install libpq-dev

参考文档

pg_config executable not found

:::

  • 生成数据同步 SQL(每个数据库的每个表一个SQL 文件)
  1. python3 ods/ods_sync.py

生成的 SQL 保存在 ods 对应的 db_name 目录下,如果数据库为 zone,会根据配置的 zone 创建一个子目录,SQL 文件名称为 sync_table_name.sql

接入资源维度表文档 - 图1

  • 通过 submit_job.py 提交 job
  1. /submit_job.py --help
  2. usage: submit_job.py [-h] [-f FILE] [-d DIRECTORY] [-i INIT] [-j JAR] [-l LIBRARY]
  3. optional arguments:
  4. -h, --help show this help message and exit
  5. -f FILE, --file FILE Script file that should be executed.
  6. -d DIRECTORY, --directory DIRECTORY
  7. Script file that should be executed.
  8. -i INIT, --init INIT Script file that used to init the session context
  9. -j JAR, --jar JAR A JAR file to be imported into the session
  10. -l LIBRARY, --library LIBRARY
  11. A JAR file directory with which every new session is initialized

:::info

  • -f:提交单个 SQL
  • -d:提交目录下的所有 SQL
  • -i:用于初始化的 SQL,初始化 SQL 文件中允许使用以下语句:
    • DDL(CREATE/DROP/ALTER)
    • USE CATALOG/DATABASE
    • LOAD/UNLOAD MODULE
    • SET command
    • RESET command

common-init.sql 用来初始化公共参数,默认使用根目录下的 common-init.sql,如果 SQL 同目录下有 common-init.sql,则使用该文件

如果未指定该参数, 同时 SQL 同目录下有同名的后缀为 -init.sql 的文件,则会使用该文件进行初始化

  • -j:指定依赖文件
  • -l:指定依赖的目录

:::

  1. - 提交单个 SQL
  1. ./submit_job.py -f ods/global/sync_wan_access.sql
  1. - 提交目录下的所有 SQL
  1. ./submit_job.py -d ods/global
  • 通过 Flink SQL Client 提交(一次只能提交一个 SQL,并且需要自己指定用于初始化的 SQL)
  1. /usr/local/flink/bin/sql-client.sh -i common-init.sql -f ods/global/sync_wan_access.sql

生成资源维度表(DIM)

配置资源维度需要抽象的表

  1. dim_resource:
  2. databases:
  3. - db: 'zone'
  4. table:
  5. - name: 'instance'
  6. - name: 'cluster'
  7. resource_name_field: 'name' # resource_name 映射的字段
  8. resource_type_field: 'app_id' # resource_type 映射的字段
  9. - name: 's2_server'
  10. resource_type_field: 'service_type' # resource_type 映射的字段
  11. - name: 'vpc_border'
  12. resource_name_field: 'border_name' # resource_name 映射的字段
  13. - name: 'routing_table'
  14. alias: 'rtable' # 如果配置了表的别名,那么 resource_id,resource_name 根据别名映射的字段
  15. - name: 'security_group'
  16. alias: 'group'
  17. - name: 'waf_rule_group'
  18. alias: 'rule_group'
  19. - name: 'key_pair'
  20. - name: 'nfv'
  21. resource_type_field: 'nfv_type'
  22. router_id_field: 'vpc_router_id' # router_id 映射的字段
  23. - db: 'global'
  24. table:
  25. - name: 'wan_access'
  26. resource_type_field: 'access_type' # resource_type 映射的字段
  27. resource_type_prefix: 'sdwan_' # resource_type 的前缀
  28. - name: 'wan_cpe_order'
  29. resource_type_value: 'sdwan_cpe_order' # resource_type 的值
  30. - name: 'wan_physical_connect'
  31. resource_type_value: 'sdwan_physical_connect' # resource_type 的值
  32. - name: 'app_saas_instance'
  33. alias: 'ins'
  34. resource_type_field: 'app_id' # resource_type 映射的字段
  35. user_id_field: 'user_id' # user_id 映射的字段
  36. - name: 'gatewayapi'
  37. alias: 'api'
  38. user_id_field: 'user_id'

生成数据同步 SQL并提交

:::info

注意⚠️ 需要 Python 3.6 以上版本

:::

  • 切换路径
  1. cd pitrix-data-warehouse
  • 生成数据同步 SQL(每个数据库的每个表一个SQL 文件)
  1. python3 dim/dim_resource/dim_resource.py

生成的 SQL 保存在 dim 对应的 dim_resource 目录下,SQL 文件名称为 table_name.sql接入资源维度表文档 - 图2

  • 通过 submit_job.py 提交 job
  1. /submit_job.py --help
  2. usage: submit_job.py [-h] [-f FILE] [-d DIRECTORY] [-i INIT] [-j JAR] [-l LIBRARY]
  3. optional arguments:
  4. -h, --help show this help message and exit
  5. -f FILE, --file FILE Script file that should be executed.
  6. -d DIRECTORY, --directory DIRECTORY
  7. Script file that should be executed.
  8. -i INIT, --init INIT Script file that used to init the session context
  9. -j JAR, --jar JAR A JAR file to be imported into the session
  10. -l LIBRARY, --library LIBRARY
  11. A JAR file directory with which every new session is initialized

:::info

  • -f:提交单个 SQL
  • -d:提交目录下的所有 SQL
  • -i:用于初始化的 SQL,初始化 SQL 文件中允许使用以下语句:
    • DDL(CREATE/DROP/ALTER)
    • USE CATALOG/DATABASE
    • LOAD/UNLOAD MODULE
    • SET command
    • RESET command

common-init.sql 用来初始化公共参数,默认使用根目录下的 common-init.sql,如果 SQL 同目录下有 common-init.sql,则使用该文件

如果未指定该参数, 同时 SQL 同目录下有同名的后缀为 -init.sql 的文件,则会使用该文件进行初始化

  • -j:指定依赖文件
  • -l:指定依赖的目录

:::

  1. - 提交单个 SQL
  1. ./submit_job.py -f dim/dim_resource/instance.sql
  1. - 提交目录下的所有 SQL
  1. ./submit_job.py -d dim/dim_resource
  • 通过 Flink SQL Client 提交(一次只能提交一个 SQL,并且需要自己指定用于初始化的 SQL)
  1. /usr/local/flink/bin/sql-client.sh -i common-init.sql -f dim/dim_resource/instance.sql