一条数据在 Apache Iceberg 之旅:写过程分析

1 Driver端流程

image.png

  • 首先我们代码中 spark.write 创建了 DataFrameWriter
  • DataFrameWriter.save() 方法会依次执行图中的1-5步骤
  • 注意其中的 createWriter 方法会找到特定的 WriteSupport 子类去初始化writer
    • 初始化时,会找到org.apache.iceberg.spark.source.IcebergSource类
    • 关注其createWriter 方法,会返回一个 org.apache.iceberg.spark.source.Writer
    • 这个Writer需要集成spark的DataSourceWriter,实现特定的commit abort方法
    • iceberg实现了这个writer,还提供了还几种不同的子类,如Partitioned24Writer 和Unpartitioned24Writer,我们后面会讲到
  • 执行DataFrameWriter的runCommand,会去执行WriteToDataSourceV2的doExecute方法
  • SparkContext的的runJob,会真正启动job,下发给executor
  1. sparkContext.runJob(
  2. rdd,
  3. //func: (TaskContext, Iterator[T]) => U, 在rdd的每个分区上都执行的方法
  4. //这里定义的是,在每个分区中都执行DataWritingSparkTask.run方法
  5. (context: TaskContext, iter: Iterator[InternalRow]) =>
  6. DataWritingSparkTask.run(writeTask, context, iter, useCommitCoordinator),
  7. // 希望跑Task的分区
  8. rdd.partitions.indices,
  9. // resultHandler: (Int, U) => Unit) 每个分区任务执行完成后的,Driver的回调函数
  10. // Int指是哪个分区返回的消息,U是消息,
  11. (index, message: WriterCommitMessage) => {
  12. messages(index) = message
  13. writer.onDataWriterCommit(message)
  14. }
  15. )

  • 2 Executor端流程

image.png