前提

Source/框架部分适配通过接口来抽象:

  1. public class SchemaClientFactory {
  2. // returns the schema client {@code SchemaClient} to fetch the source schema.
  3. public SchemaClient getSchemaClient(RuntimeContext runtimeContext);
  4. }
  5. interface SchemaClient {
  6. // returns the latest schema of the source
  7. // per-record 调用性能能否扛得住
  8. Schema getLatestSchema();
  9. }
  10. // 主动上报 schema event 会变得很大

变更

RowDataToHoodieFunction

  • map function 需要调用 SchemaClient#getLatestSchema 判断 schema 是否变更

  • 如果变更需要更新成员:avroSchemaconverter

StreamWriteFunction

  • write client 在写入之前,主动 check schema 是否变更,同样通过 SchemaClient#getLatestSchema来比对,需要及时更新 write config 中的 schema 为最新
  • 是否上报 schema 给 coordinator ? 待定

AppendWriteFunction

  • processElement 中 需要调用 SchemaClient#getLatestSchema 判断 schema 是否变更

  • 如果变更,需要 close 掉当前 write helper,重新 new

BootstrapOperator

  • 因为加载的离线数据,通过 TableSchemaResolver 拿最新 schema 即可

StreamWriteOperatorCoordinator 需要更新 write config 的 schema 为最新

  • 方案一:write task 主动上报
  • 方案二:coordinator 自己有办法拿到