技术提问

1. 实践

a. 流批一体有那些实践案例,离线链路如何修正实时链路?

b. MySQL数据同步到iceberg的最佳实践,如何调整写入方式实现最快的查询性能,能写入时按主键分桶吗?

  • 开源版本 flink-cdc-connector 目前的不足:

    • 一张mysql cdc表,对应一张iceberg表。所有的列都需要写一遍,非常繁琐。

      • 解法: 引入新的Flink SQL语法
        1. CREATE TABLE iceberg_table
        2. AS (SELECT * FROM mysql_cdc_table);
    • 一张mysql cdc表,对应一张iceberg表,对应一个flink streaming job。线上mysql 1000张表,则意味着线上的mysql binlog要被1000个mysql slave拉取,压力太大。

      • 解法:建议使用 debezium 工具将binlog通过一个作业导入 Kafka,再用mysql-cdc-connector或其他工具统一从kafka拉取,减轻拉取线上mysql binlog的负担。
    • flink-cdc-connector 在拉取全量+增量的时候,需要锁表。而且导出全量数据是单线程,大表很慢。

image.png

  - 解法:参考 [DBLog](https://arxiv.org/pdf/2010.12597v1.pdf) 和 [DDD-3](https://github.com/debezium/debezium-design-documents/blob/main/DDD-3.md) 论文实现新版本的 flink-cdc-connector。
  • 建议按主键分 bucket。
    • 写入时按照bucket(pk) shuffle写入
      • 保证语义正确性:同一个key的不同版本落在同一个并发之上。
      • 各bucket内数据均衡
        • 一个bucket一个writer,避免打开太多writer导致流式作业出现OOM。
      • 读取时大大缩小待JOIN的delete文件。

image.png

  • 创建表格时按照主键列做分区字段。 ```sql — correct CREATE TABLE iceberg_cdc_table ( user_id STRING, last_visit_ts STRING, info STRING, PRIMARY KEY (user_id)
    ) PARTITIONED BY (user_id) WITH ( ‘connector’=’iceberg’, ‘catalog-type’=’hive’, ‘uri’ = ‘thrift://localhost:9083’, ‘warehouse’=’hdfs://nn:’ );

— incorrect CREATE TABLE iceberg_cdc_table ( user_id STRING, last_visit_ts STRING, info STRING, PRIMARY KEY (user_id)
) PARTITIONED BY (last_visit_ts) WITH ( ‘connector’=’iceberg’, ‘catalog-type’=’hive’, ‘uri’ = ‘thrift://localhost:9083’, ‘warehouse’=’hdfs://nn:’ ); ```

a. 边写入边合并时,乐观锁机制。容易发生冲突,导致重试频繁?

  • 对于 v1 来说:
    • append-only的两个 txn 发生冲突,并不会产生额外的成本。后面的 txn 直接提交即可。
    • 最佳实践:
      • 5 min 做checkpoint实时写入
      • 采用定时调度的方式,启动离线作业来合并。
  • 对于 v2 来说:
    • 若批量更新作业与compaction作业冲突,那么要么批量更新作业重试,要么compaction作业重试。
    • 若流式更新作业与compaction作业冲突,那么直接按照v1策略重试即可。

image.png

更详细的上下文可以参考《Handle the case that RewriteFiles and RowDelta commit the transaction at the same time》。

b. Kafka schema 变更,能否实现自动或一键式下沉到 iceberg ?

3. 读取

a. Trino 暂时不支持读取 v2 格式表,需要通过 SparkSQL,如果是用 Pilo 提供服务,如何做到路由?

  • aws 的 trino团队 和阿里云的trino团队,在推进这件事情。不需要通过额外的路由服务来实现。

b. Spark SQL 当前通过kyuubi提供服务,存在任务启动延迟,请求带来 30s 左右额外耗时?

  • 不太熟悉kyuubi这个组件,启动延迟需要profile一下。

4. 增量处理

a. 增量处理如何实现?

b. 增量处理支持批处理模式,还是流式模式?

  • 目前社区版本 Flink 支持流模式,同时也支持批模式。
  • 目前社区版本 Spark 支持 Structure Streaming,同时也支持批作业读取。

c. 当前 Flink 支持从某个Snopshot开始的增量读取,Spark能否支持?

  • 支持的。

    5. Iceberg 表管理

a. Iceberg 表接入元数据中心,感知建表、删表操作,或统一管理。

1. parquet格式支持bloom filter加速

https://github.com/apache/iceberg/pull/2642

2. Spark On HDFS 加速

https://github.com/apache/iceberg/pull/2577/files

a. 设置option禁止某些情况下去拿block的location。
b. 多线程启动ReadTask。