一条数据在 Apache Iceberg 之旅:写过程分析
1 Driver端流程

- 首先我们代码中
spark.write 创建了 DataFrameWriter DataFrameWriter.save() 方法会依次执行图中的1-5步骤- 注意其中的
createWriter 方法会找到特定的 WriteSupport 子类去初始化writer
- 初始化时,会找到org.apache.iceberg.spark.source.IcebergSource类
- 关注其
createWriter 方法,会返回一个 org.apache.iceberg.spark.source.Writer - 这个Writer需要集成spark的DataSourceWriter,实现特定的commit abort方法
- iceberg实现了这个writer,还提供了还几种不同的子类,如Partitioned24Writer 和Unpartitioned24Writer,我们后面会讲到
- 执行DataFrameWriter的runCommand,会去执行WriteToDataSourceV2的doExecute方法
- SparkContext的的runJob,会真正启动job,下发给executor
sparkContext.runJob( rdd, //func: (TaskContext, Iterator[T]) => U, 在rdd的每个分区上都执行的方法 //这里定义的是,在每个分区中都执行DataWritingSparkTask.run方法 (context: TaskContext, iter: Iterator[InternalRow]) => DataWritingSparkTask.run(writeTask, context, iter, useCommitCoordinator), // 希望跑Task的分区 rdd.partitions.indices, // resultHandler: (Int, U) => Unit) 每个分区任务执行完成后的,Driver的回调函数 // Int指是哪个分区返回的消息,U是消息, (index, message: WriterCommitMessage) => { messages(index) = message writer.onDataWriterCommit(message) } )
