37手游基于FlinkCDC Hudi湖仓一体方案实践 - 图1

作者: 37手游大数据开发-徐润柏

本文介绍了37手游基于FlinkCDC+Hudi湖仓一体方案实践,主要内容包括:

  1. FLINK CDC基本知识介绍
  2. HUDI基本知识介绍
  3. 37手游的业务痛点和技术方案选型
  4. 37手游湖仓一体介绍
  5. Flink CDC+HUDI实践
  6. 总结

一.FLINK-CDC 2.0

Flink CDC ConnectorsApache Flink的一个source端的连接器,目前2.0版本支持从 MySQL 以及 Postgres两种数据源中获取数据,2.1版本社区确定会支持Oracle, MongoDB数据源。

Flink CDC 底层封装了 Debezium, Debezium 同步一张表会分为两个阶段:

  • 全量阶段:查询当前表中所有记录
  • 增量阶段:从 binlog 消费变更数据

由于1.x版本存在以下三个问题:

  • 一致性需要加锁来保证的,即需要给锁权限,因此会存在hang 住数据库的风险
  • 不支持水平扩展,即在全量snapshot阶段只支持单并行度去读取,亿级别大表的读取时间基本需要小时级别
  • 全量snapshot阶段不支持checkpoint,即snapshot期间每次checkpoint均会失败,需要调整checkpoint最大失败次数才能保证正常拉取数据

综上所述, Flink CDC 2.0 的主要核心是要解决上述的三个问题,即要实现无锁、支持水平扩展checkpoint
而Flink CDC实现无锁,主要是参考 Netflix 的 DBlog 论文里描述的无锁算法,该算法的核心思想是在划分了 Chunk 后,对于每个 Chunk 的全量读取和增量读取,在不用锁的条件下完成一致性的合并。

由于该算法需要在数据库中维护一张信号表,再通过信号表在 binlog 文件中打点,记录低点位和高点位。因此Flink CDC 结合自身的情况,在 Chunk 读取算法上做了去信号表的改进,不需要额外维护信号表,通过直接读取 binlog 位点替代在 binlog 中做标记的功能。整体流程可以概括为:

首先通过主键对表进行 Snapshot Chunk 划分,再将 Snapshot Chunk 分发给多个 SourceReader,每个 Snapshot Chunk 读取时通过算法实现无锁条件下的一致性读,SourceReader 读取时支持 chunk 粒度的 checkpoint,在所有 Snapshot Chunk 读取完成后,下发一个 binlog chunk 进行增量部分的 binlog 读取,这便是 Flink CDC 2.0 的整体流程,如下图所示:

37手游基于FlinkCDC Hudi湖仓一体方案实践 - 图2

总结Fink CDC 2.0的核心feature,主要表现为实现了以下三个非常重要的功能:

  • 全程无锁,不会对数据库产生需要加锁所带来的风险
  • 多并行度,全量数据的读取阶段支持水平扩展,使亿级别的大表可以通过加大并行度来加快读取速度
  • 断点续传,全量阶段支持checkpoint,即使任务因某种原因退出了,也可通过保存的checkpoint对任务进行恢复实现数据的断点续传

二.HUDI

Apache Hudi目前被业内描述为围绕数据库内核构建的流式数据湖平台(Streaming Data Lake Platform)

1. HUDI的表类型

COW:使用parquet文件格式存储数据。通过在写入期间执行同步合并,更新文件版本和重写文件。

37手游基于FlinkCDC Hudi湖仓一体方案实践 - 图3

MOR:使用parquet +avro文件格式的组合存储数据。将数据记录到增量log文件中,然后以同步或异步方式压缩生成新版本的parquet 文件。

37手游基于FlinkCDC Hudi湖仓一体方案实践 - 图4

2. HUDI的三种写入类型

  • UPSERT:默认行为,数据先通过 index 打标(INSERT/UPDATE),有一些启发式算法决定消息的组织以优化文件的大小 => CDC 导入
  • INSERT:跳过 index,写入效率更高 => Log Deduplication
  • BULK_INSERT:写排序,对大数据量的 Hudi 表初始化友好,对文件大小的限制 best effort(写 HFile)

3. HUDI的核心优势

  • 写入过程充分优化了文件存储的小文件问题,Copy On Write 写会一直将一个 bucket (FileGroup)的 base 文件写到设定的阈值大小才会划分新的 bucket;Merge On Read 写在同一个 bucket 中,log file 也是一直 append 直到大小超过设定的阈值 roll over。
  • 对 UPDATE 和 DELETE 的支持非常高效,一条 record 的整个生命周期操作都发生在同一个 bucket,不仅减少小文件数量,也提升了数据读取的效率(不必要的 join 和 merge)

三.37手游的业务痛点和技术方案选型

业务痛点:

1. 数据实时性不够

  • 新增类的数据通过cannal每半小时同步前一个小时的数据到hive
  • 更新类数据则通过cannal每半小时同步当天全量数据到hive

2. 业务代码逻辑复杂且难维护

  • 目前37手游还有很多的业务开发沿用Mysql+PHP的开发模式,代码逻辑复杂且很难维护
  • 相同的代码逻辑往往流处理需要开发一份代码,批处理也需要开发一份代码,不能复用

3. 频繁重刷历史数据

  • 繁地重刷历史数据来保证数据一致

4. Schema变更频繁

  • 由于业务需求,经常需要添加表字段

5. Hive版本低

  • 目前hive使用版本为1.x版本,并且升级版本比较困难
  • 不支持Upsert
  • 不支持行级别的delete

由于37手游的业务场景,数据upsert,delete是个很常见的需求。所以基于Hive数仓的架构对业务需求的满足度不够。

技术选型:

针对上述存在的业务痛点以及Flink CDC 2.0目前只支持Flink 1.13.x的版本,因此我们选择Flink1.13.2作为计算引擎,依靠Flink提供的流批统一的API,基于Flink-SQL实现流批一体,解决维护两套代码的业务痛点。

在存储引擎的选型上,目前最热门的数据湖产品当属:Apache Hudi 和 Apache Iceberg和 DeltaLake。在我们的场景下各有优劣,最终基于Hudi对上下游生态的开放,对全局索引的支持,对Flink 1.13版本的支持以及对Hive版本的兼容性上(Iceberg不支持hive1.x的版本)等原因,最终选择了Hudi作为湖仓一体和流批一体的存储引擎。

因此最终的技术方案为:基于最新的Flink 1.13.2作为计算引擎,Flink-CDC 2.0作为ODS层的数据同步工具以及Hudi-0.10 Master作为存储引擎的湖仓一体最终选型方案。

四.湖仓一体

37手游的湖仓一体方案,是37手游流批一体架构的一部分。通过湖仓一体、流批一体,准实时场景下做到了:数据同源、同计算引擎、同存储、同计算口径。数据的时效性可以到分钟级,能很好的满足业务准实时数仓的需求。下面是架构图:

37手游基于FlinkCDC Hudi湖仓一体方案实践 - 图5

MYSQL数据通过Flink CDC进入到Kafka。之所以数据先入Kafka不直接入Hudi,是为了实现多个实时任务复用Mysql过来的数据,避免多个任务通过Flink CDC接Mysql表以及binlog,对mysql库的性能造成影响。通过 CDC进入到Kafka的数据落一份到离线数据仓库的ODS层之外,同时按照实时数据仓库的链路,从ODS->DWD->DWS..->OLAP数据库,最后供报表等数据服务使用。实时数仓的每一层结果数据会准实时的落一份到离线数仓,通过这种方式做到程序一次开发、指标口径统一,数据统一。

从架构图上,我们看到有一步数据修正(重跑历史数据)的动作,之所有有这一步是考虑到有可能存在由于口径调整或者前一天的实时任务计算结果错误导致重跑历史数据的情况,而存在Kafka的数据有失效时间,不会存太久的历史数据,如果重跑很久的历史数据则无法从Kafka中获取历史源数据。再者如果把大量的历史数据再一次推到kafka,走实时计算的链路来修正历史数据,可能会影响当天的实时作业。所以重跑历史数据的话,我们通过数据修正这一步来处理。总体上说,37手游的数据仓库属于Lamda和Kappa混搭的架构。流批一体数据仓库的各个数据链路有数据质量稽核的流程。第二天对前一天的数据进行对账,如果前一天实时计算的数据无异常,则不需要修正数据,Kappa架构足够。

五.Flink CDC 2.0+Kafka+Hudi 0.10实践

1. 环境准备

  • Flink 1.13.2
  • …/lib/hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar(修改Master分支的hudi Flink版本为1.13.2然后构建)
  • …/lib/hadoop-mapreduce-client-core-2.7.3.jar(解决hudi ClassNotFoundException)
  • ../lib/flink-sql-connector-mysql-cdc-2.0.0.jar
  • ../lib/flink-format-changelog-json-2.0-SNAPSHOT.jar
  • ../lib/flink-sql-connector-kafka_2.11-1.13.2.jar

source端Mysql-CDC表定义:

  1. create table sy_payment_cdc (
  2. ID BIGINT,
  3. ...
  4. PRIMARY KEY(ID) NOT ENFORCED
  5. ) with(
  6. 'connector' = 'mysql-cdc',
  7. 'hostname' = '',
  8. 'port' = '',
  9. 'username' = '',
  10. 'password' = '',
  11. 'database-name' = '',
  12. 'table-name' = '',
  13. 'connect.timeout' = '60s',
  14. 'scan.incremental.snapshot.chunk.size' = '100000',
  15. 'server-id'='5401-5416'
  16. );

值得注意的是:scan.incremental.snapshot.chunk.size参数需要根据实际情况来配置,如果表数据量不大,使用默认值即可.

sink端kafka+Hudi COW表定义:

  1. create table sy_payment_cdc2kafka (
  2. ID BIGINT,
  3. ...
  4. PRIMARY KEY(ID) NOT ENFORCED
  5. ) with (
  6. 'connector' = 'kafka',
  7. 'topic' = '',
  8. 'scan.startup.mode' = 'latest-offset',
  9. 'properties.bootstrap.servers' = '',
  10. 'properties.group.id' = '',
  11. 'key.format' = '',
  12. 'key.fields' = '',
  13. 'format' = 'changelog-json'
  14. );
  15. create table sy_payment2hudi (
  16. ID BIGINT,
  17. ...
  18. PRIMARY KEY(ID) NOT ENFORCED
  19. )
  20. PARTITIONED BY (YMD)
  21. WITH (
  22. 'connector' = 'hudi',
  23. 'path' = 'hdfs:///data/hudi/m37_mpay_tj/sy_payment',
  24. 'table.type' = 'COPY_ON_WRITE',
  25. 'partition.default_name' = 'YMD',
  26. 'write.insert.drop.duplicates' = 'true',
  27. 'write.bulk_insert.shuffle_by_partition' = 'false',
  28. 'write.bulk_insert.sort_by_partition' = 'false',
  29. 'write.precombine.field' = 'MTIME',
  30. 'write.tasks' = '16',
  31. 'write.bucket_assign.tasks' = '16',
  32. 'write.task.max.size' = '48000',
  33. 'write.merge.max_memory' = '40000'
  34. );

值得注意的是,此表的QPS并不大,因此选择COW表,QPS如果达到20000以上,建议还是选择MOR表.

另外为了加快历史数据入湖,这里配置了write.tasks以及write.bucket_assign.tasks的并行度为16,并且write.task.max.size配置了40多G.等到所有历史数据入湖后,我们需要相应的调小内存配置至合适的值并且将source端的并行度设置为1,然后指定checkpoint重启整个任务.

因为Flink CDC 2.0可以根据checkpoint记录的binlog点位接着进行解析,因此任务调整期间的数据不会丢失.

37手游基于FlinkCDC Hudi湖仓一体方案实践 - 图6

  • 按照上面表定义的参数配置,配置16个并行度,Flink TaskManager内存大小为50G的情况下,单表15亿历史数据入至Hudi COW表实际用时10小时,单表9亿数据入至Hudi COW表实际用时6小时.

    目前我们的集群由200多台机器组成,在线的流计算任务总数有200多,总数据量接近2PB.

    当然,如果集群资源很有限的情况下,可以根据具体情况调整Hudi表以及Flink任务的内存配置,让历史数据缓慢入湖,这样就不需要再调整任务资源然后指定checkpoint去重启任务.

2. 数据比对

  • 由于生产环境用的是hive1.x,Hudi对于1.x还不支持数据同步,所以通过创建hive外部表的方式进行查询,如果是hive2.x以上版本,可参考HIVE同步章节
  • 创建hive外部表+预创建分区
  • auxlib文件夹添加hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar
  1. CREATE EXTERNAL TABLE m37_mpay_tj.`ods_sy_payment_f_d_b_ext`(
  2. `_hoodie_commit_time` string,
  3. `_hoodie_commit_seqno` string,
  4. `_hoodie_record_key` string,
  5. `_hoodie_partition_path` string,
  6. `_hoodie_file_name` string,
  7. `ID` bigint,
  8. ...
  9. )
  10. PARTITIONED BY (
  11. `dt` string)
  12. ROW FORMAT SERDE
  13. 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
  14. STORED AS INPUTFORMAT
  15. 'org.apache.hudi.hadoop.HoodieParquetInputFormat'
  16. OUTPUTFORMAT
  17. 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
  18. LOCATION
  19. 'hdfs:///data/hudi/m37_mpay_tj/sy_payment'

最终查询Hudi数据(hive外部表的形式)与原来cannal同步的hive数据做比对得到:

1.总数一致
2.按天分组统计数量一致
3.按天分组统计金额一致

六.总结

湖仓一体以及流批一体架构对比传统数仓架构主要有以下好处:

  • Hudi提供了Upsert能力,解决频繁Upsert/Delete的痛点
  • 提供分钟级的数据,比传统数仓更高的时效性
  • 基于Flink-SQL实现了流批一体,代码维护成本低
  • 数据同源、同计算引擎、同存储、同计算口径

最后针对频繁增加表字段的痛点需求,并且希望后续同步下游系统的时候能够自动加入这个字段,目前还没有完美的解决方案,希望Flink CDC社区能在后续的版本提供Schema Evolution的支持。

[Reference]

[1] https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html

[2] https://www.yuque.com/docs/share/01c98494-a980-414c-9c45-152023bf3c17?#

[3] https://www.yuque.com/docs/share/5d1c383d-c3fc-483a-ad7e-d8181d6295cd?#

[4] https://flink-learning.org.cn/article/detail/3ebe9f20774991c4d5eeb75a141d9e1e