• 听说你准备了面试题?
  • CDC程序还是有细节的
  • 业务表CDC程序设计
    • 一个表一个topic,还是?
    • 乱序问题,不处理试试?
  • CDC流应用写入Hudi优化
    • 不做cache,自取灭亡
    • 一次计算,扫描数百GB的缓存
    • 单线程调度,就等着Kafka丢数吧
    • 不要让所有表都写放大

写了快两个月Structured Streaming的代码,最近刚把数据迁移代码写完。
今晚有点时间,想着给大家分享一点我在基于Hudi实现CDC的一些经验。
每个公司的场景会有些不一样,
尤其是使用一些之前没有深度使用的技术,
每一种场景的切换,
都需要一路做各种尝试,各种优化,
CDC流式程序听起来简单,但其实还是有很多细节值得去考虑。

听说你准备了面试题?

我先把这些生产上大概率会遇到的问题放在这,大家看看脑海里是否有答案:

  1. 因为Hudi的底层存储是在HDFS,而流式程序在写入数据时,一定会产生大量小文件。Hudi里面提供了小文件的方案。在CDC的表数量很少的情况,看似一切正常。但如果有上百张、上千张,写放大会非常严重,如何提升效率?
  2. 在开发流式写入Hudi时,如何实现Hudi的删除数据功能?这点和Kudu会有较大差别,Kudu很容易实现行级的删除,但Hudi则不好操作,而且,CDC的数据是有序的。
  3. CDC日志如果数据都存储在几个topic中,什么情况会出现Kafka重复消费数据?如何避免?
  4. 业务数据库的schema是经常会发生变化的,一旦发生变化,就会导致写入到Hudi应用报错,否则就会丢失数据。怎么样管理好业务库的元数据和Hudi中的元数据一致呢?
  5. 因为开发Structured Streaming最终是以Cluster模式运行在YARN集群中的,配置文件如何处理的?
  6. 如果利用的是Structured Streaming的checkpoint机制,那么在项目中应该如何管理checkpoint的呢?
  7. 业务库中有几千张表,是需要将这几千张表全部上线到CDC吗?会有什么问题吗?
  8. 假设业务系统有一张业务表需要上线到CDC应用,如何上线?
  9. 因为业务表之前是有不少数据的,上线时怎么保证不丢数据?
  10. 如果要在Structured Streaming中写入上百张、上千张Hudi表,Spark是单线程调度写,还是多线程调度写的?
  11. 在多线程环境中调度Spark Job,如果某个线程抛出异常,会结束掉应用吗?如果没有结束应用会出现什么情况?
  12. 假设我们使用的是多线程调度Spark Job,某个线程抛出异常,怎么做到迅速结束所有调度?
  13. 可不可以为每个Hudi表建立一条Streaming Pipeline,为什么?会出现什么问题吗?
  14. 写Hudi表必须要提供主键,但在业务库中主键不是统一的主键,极端的情况,还有的表没有主键,如何处理方便管理呢?
  15. CDC流式写Hudi中,是否要保证字段有序?如果需要,如何保证?
  16. 因为会有两套程序写Hudi,一套是CDC流程序,还有一套是全量入湖程序,是否需要保证这两套程序写入到Hudi的schema是一致的?如何保证?
  17. 因为Kafka临时存储的数据量是有限的,如果CDC流程序出现故障,如何保证数据还能恢复回来呢?也就是,如果要让你设计一个数据重跑程序,你怎么设计呢?
  18. 业务表如果遇见了批量刷数,会给CDC流程序带来什么影响?你会怎么解决呢?
  19. CDC日志如果数量非常大,那么实时采集CDC日志的程序该怎么设计?Kafka的topic如何组织数据?
  20. CDC的乱序问题,如果有,怎么解决呢?
  21. 用了PySpark吗?说一说选择是哪种运行模式?为什么选择呢?
  22. PySpark中,关于UDF是如何开发的?为什么用这种方式开发?
  23. ….

暂时想到这么多,
里面有一些是跟Structured Streaming有关的,
不过很多问题,用其他流计算引擎也都会遇见。
所以,纠结用Spark还是Flink没用,还是要去解决问题。

CDC程序还是有细节的

大家看过了上面的问题,
大部分一说同步程序,
都会觉得CDC的流式应用看起来简单,
——就是解析一下CDC JSON日志,然后写入到Hudi就结束了。
但大家看看,
在写流式程序中碰到了这么多的问题,
都是要去解决的。
不然,它们就会不停地恶心你。
时不时跳出来咬你一口。
所以,架构固然重要,
对于开发来说,
细节也很重要。
上面列出的这些个问题都能解释得很清楚,
而且可以原理上跟面试官聊明白,
如果你去面试的刚好是流式应用开发,
应该是能拿到一个不差的offer。
背生硬的面试题别人是能感受到的。
结合着实际生产再去聊原理才有说服力。
篇幅有限,我试着慢慢来给大家介绍这些问题的处理方案。
肯定有更好的方案,我只说我在一个一般规模的生产上的尝试。
如果做Demo就算了,不会有说服力的。

业务表CDC程序设计

如果大家将来开发的应用数据量非常小,
不用考虑效率问题,
不需要考虑Kafka积压问题。
那其实,
程序容易开发很多。
代码怎么写都不会死。
但这种场景对大家的技术能力不会有提升,
在将来的面试,
即使用了Hudi、ClickHouse、Flink一些较新的技术也不会有亮点,
大多数人已经不吃这套了。
除非去的那个公司的技术人员压根一点都不懂。

一个表一个topic,还是?

现在在公司里面,Kafka应用还是很多的。
如果数据库开启了CDC,
这些CDC日志都放在一个topic中,
还是说一个表对应一个topic?
这个问题其实要看场景,会有多种选择。
还是那句话,如果表数量小,无所谓了,怎么搞都行。
但如何表数量非常多,每个表对应一个topic,会对ZK产生较大影响。
说几种设计方法供大家参考:

  1. 按照数据库来组织,一个库的CDC都放在一个topic中。
  2. 如果按照库来组织依然很大,可以启动多个采集示例,每个示例对应一个表匹配模式,然后表匹配模式将CDC日志推入到不同的topic。这种方式适合大规模的CDC日志,控制起来比较灵活。

    乱序问题,不处理试试?

    CDC日志一定是有序的。我给大家举个例子就知道了。假设执行以下几个DML操作:

  3. INSERT

  4. UPDATE
  5. DELETE

大家想一下,如果顺序出现错乱是导致什么问题?例如,因为网络延迟,进入到Kafka中的数据变成了:

  1. INSERT
  2. DELETE
  3. UPDATE

看出来了吗?当第三次去UPDATE的时候,数据已经被清理掉了,如何UPDATE?
再说一个更严重的,假设Kafka CDC日志的有多个分区,且生产者端写入策略是轮询方式。会出现问题?
我们发现,CDC日志全都是乱序的。我们将来看到的数据也都是错的。
有几种办法给大家做参考:

  1. Kafka的topic仅设置一个分区。这种方式在表数量、数据量不是太大是可行的。大规模数据量,拉取Kafka的数据会出现瓶颈。
  2. 自定义Kafka生产策略。例如:按照库名、表名、甚至是特征字段来分区。针对一些并发特别高的表,我们甚至需要有针对性的设计写入策略。例如:以表名、以及一个完整业务流程作为分区方式。
  3. 轮询写入Kafka,避免倾斜、最大并发化,在Kafka中不考虑乱序问题。在处理引擎拉取到数据,在处理之前先按照指定时间戳字段排序。

    CDC流应用写入Hudi优化

    大家如果在跑数百张表的数据CDC到Hudi。
    你会惊奇地发现,这跟跑几张表的DEMO完全不是一码事。
    就是特别的慢。并行度特别高的情况,HDFS的负载也是特别高。
    上百张表如果不去优化,想要跑出来一个不过的效果,轻轻松松吃掉集群几个TB的资源。
    所以,我需要来跟大家聊聊我在设计流程序过程中给应用做的优化。

    不做cache,自取灭亡

    我们都知道Spark是基于DAG来进行stage调度,然后在基于TaskSceduler调度一个个任务的。
    因为我们需要对CDC日志进行解析、验证以及转换处理。
    所以,每一次计算都有可能会导致从源头重新拉取数据。
    我们的CDC程序中要刷入上百张Hudi表,兄弟如果你没有做cache,这意味着:
    Streaming程序需要从Kafka重复拉取上百次数据
    如果有上千张表就更恐怖了。
    大家可以自己去测试一下,在落地到表之前,不做cache的后果。
    Kafka的topic中的数据是很大的,单个topic几十亿、上百亿的消息是正常水平。
    大家可能会说,没事啊。
    Kafka的吞吐超级高,
    但考虑一下,吞吐再高,也经不住这样重复消费数据。
    而且Kafka的吞吐会受到服务器IO的影响。
    如果Kafka没有做限流,
    一旦Kafka负载过高,导致其他的系统也无法正常生产、消费Kafka的数据。
    一首《凉凉》自己唱吧。

    一次计算,扫描数百GB的缓存

    开启了Structured Streaming的cache后,
    然后我们发现Kafka的负载下降了很多。
    高兴坏了。
    然后,发现每次刷入数据到Hudi时,光读取数据就要几分钟。
    看了一下DAG,
    确实不再从Kafka直接拉数据,
    而是从cache中拉取数据,
    这个cache也不小呢,每次Batch cache几十GB、上百GB。
    每次对表做一次计算,都需要从扫描整个cache。
    那么有几百表,
    这个cache就需要被扫描几百次,
    我需要让每个表后续的计算尽量读取少一些数据。
    所以,我在基于batch的cache的基础之上。
    再次做了一个针对表的二级缓存。
    后续针对表的操作,直接拉取到的二级缓存就只拉取自己的数据即可。
    总结下:
  • 一级缓存,就是Structured Streaming一个batch的cache,包含了所有表的cdc日志。一级缓存要解决的是Kafka重复消费问题。
  • 二级缓存,Hudi表缓存。针对每个表,都会单独做一个cache,避免每个针对表的操作都需要重新读取一次一级缓存,从而提升读取效率。

    单线程调度,就等着Kafka丢数吧

    刷几张Hudi表,
    我们会发现,好快哦!好High哦!
    image-20210913232847124
    但是随着刷入的表越来越多,
    发现Structured Streaming写入Hudi越来越慢。
    而且你发现,Spark的任务并发没有利用好。
    明明有几百个container,
    并行的任务却只有几十个。
    一个个的表地写。
    所以,根据实践,
    我们可以判断在foreachBatch中,Spark是单线程调度。
    我们有几百张表需要刷入到Hudi中。
    一个个表刷显然太不现实了。
    刷入的数据太慢,
    Kafka进数非常快,这就会导致,当我们正在消费某个数据。
    Kafka积压的数据太多了,
    所以触发了清理操作。
    然后数据还没有被数据就丢掉了。
    所以,根据实践,
    我们需要自己来实现多线程调度,
    你会用到Java的并发包,
    然后一次将数据刷入若干个Hudi表中。
    至少,一次启用几十个线程来刷Hudi表是没有问题的。

    不要让所有表都写放大

    在开发环境,调通了一个表的CDC日志解析后。
    看见 Structured Streaming 能够即时将数据正确地刷入到Hudi。
    天哪!历经困难重重,终于把数据刷到湖仓里面。
    打开Spark SQL的cli,数据也能够正确的查询查询出来,统一hoodie_record_key对应的数据也能正确更新。
    所以,我高兴地将Maven Profile切换到prod。
    准备到准生产做一个验证。
    几分钟地等待,
    Maven把所有的shell、python、配置文件打包到了一个tar.gz。
    废了九牛二虎之力,
    将tar.gz包上传到准生产。
    将要刷入LakeHouse的目标表元数据初始化好。
    模拟上线大概几百张目标表。
    当YARN把Streaming应用拉起来的时候,
    我就发现有点不妙。
    这些个表,
    跑一次batch发现Web UI就展示了几千个Stage。
    准生产的HDFS集群负载一下飚满。
    您猜怎么招?
    Hudi要处理小文件,
    就需要检查HDFS上的文件,
    并且将小文件合并。
    是不是感觉似曾相识?
    我肯定你在Kudu、HBase等LSM结构的Compaction中见过。
    写放大。
    是不是慢点就慢点?
    大不了数据就延迟大点。
    不!
    这样的写放大,
    HDFS负载会猛增,
    其他的任务还要不要玩?
    还有,你确定Kafka会一直保存那些被积压的数据吗?
    Log Compaction和Log Deletion会是摆设?
    所以,这程序如果这样,
    熬不了一天,在半夜业务库刷数的时候,就会直接因为Kafka数据丢失导致应用退出。
    神马?
    不退?
    任何人都无法保证最终的数据是正确的。
    耶稣都保不住,我说的。
    你说:是不是该去调Spark、Hudi参数了?
    大可以去试试,
    在资源有限的情况下,
    有很大可能会无功而返。
    我问个问题:业务库的表中是不是每个表无时无刻都在刷数?
    我想,95%的业务系统不会。
    业务库中一定会有一些表是缓慢变化的。
    而针对缓慢变化的业务表,根本没有必要每个Batch都去检查小文件、合并。
    所以,每当在将数据刷入目标表之前。请一定要检查,当前这个Batch中有哪些目标表需要刷数。
    这一期先写到这里啦。
    下回我们接着聊基于Hudi的LakeHouse生产实践。