概述

官方文档并不全。Presto、Flink的文档都没有

通识

本质还是对HDFS文件的读写,借助Hive存取元数据

一、Spark使用

目前只支持DataFrame API

  1. val df = Seq(
  2. (1, "foo", "2020/04/04", System.currentTimeMillis()),
  3. (2, "bar Value", "2020/04/04", System.currentTimeMillis())
  4. ).toDF("id", "name", "dt", "ts")
  5. df.write.format("hudi")
  6. .options(getQuickstartWriteConfigs)
  7. .option(RECORDKEY_FIELD_OPT_KEY, "id")
  8. .option(PARTITIONPATH_FIELD_OPT_KEY, "dt")
  9. .option(TABLE_NAME, tableName)
  10. .option(HIVE_SYNC_ENABLED_OPT_KEY, true)
  11. .option(HIVE_DATABASE_OPT_KEY, "luna_hudi")
  12. .option(HIVE_TABLE_OPT_KEY, tableName)
  13. .option(HIVE_URL_OPT_KEY, "jdbc:hive2://foo:10000")
  14. .option(HIVE_USER_OPT_KEY, "admin")
  15. .option(HIVE_PARTITION_FIELDS_OPT_KEY, "dt")
  16. .mode(SaveMode.Overwrite)
  17. .save(basePath)

Spark依赖

  1. <!-- https://mvnrepository.com/artifact/org.apache.hudi/hudi-spark-bundle -->
  2. <dependency>
  3. <groupId>org.apache.hudi</groupId>
  4. <artifactId>hudi-spark-bundle_2.11</artifactId>
  5. <version>0.7.0</version>
  6. </dependency>
  7. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-avro -->
  8. <dependency>
  9. <groupId>org.apache.spark</groupId>
  10. <artifactId>spark-avro_2.11</artifactId>
  11. <version>2.4.6</version>
  12. </dependency>

Spark配置

  1. spark.serializer=org.apache.spark.serializer.KryoSerializer
  2. spark.sql.hive.convertMetastoreParquet=false

同步Hive配置项

同步Hive原理是创建一张外部表。

  1. HIVE_SYNC_ENABLED_OPT_KEY=true
  2. HIVE_DATABASE_OPT_KEY=luna_hudi
  3. HIVE_TABLE_OPT_KEY=luna_ods
  4. HIVE_URL_OPT_KEY=jdbc:hive2://foo:10000

二、与Hive对接

Spark开启Hive自动同步选项,会直接向HiveServer2创建外部表。
外部表
要把hudi-hadoop-mr-bundle放在目录hive的lib或者Spark ThriftServer的jars目录其中HiveServer2/ThriftServer。
https://mvnrepository.com/artifact/org.apache.hudi/hudi-hadoop-mr-bundle/0.7.0

Spark ThriftServer不支持 ALTER TABLE

org.apache.hudi.hive.HoodieHiveSyncException: Failed in executing SQL ALTER TABLE luna_hudi.luna_ods_cow REPLACE COLUMNS(_hoodie_commit_time string, _hoodie_commit_seqno string, _hoodie_record_key string, _hoodie_partition_path string, _hoodie_file_name string, id int, name string, dt string, ts bigint ) at org.apache.hudi.hive.HoodieHiveClient.updateHiveSQL(HoodieHiveClient.java:369) at org.apache.hudi.hive.HoodieHiveClient.updateTableDefinition(HoodieHiveClient.java:251) at org.apache.hudi.hive.HiveSyncTool.syncSchema(HiveSyncTool.java:188) at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:136) at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:94) at org.apache.hudi.HoodieSparkSqlWriter$.org$apache$hudi$HoodieSparkSqlWriter$$syncHive(HoodieSparkSqlWriter.scala:355) at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$metaSync$2.apply(HoodieSparkSqlWriter.scala:403)

  1. at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$metaSync$2.apply(HoodieSparkSqlWriter.scala:399)
  2. at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
  3. at org.apache.hudi.HoodieSparkSqlWriter$.metaSync(HoodieSparkSqlWriter.scala:399)
  4. at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:460)
  5. at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:217)
  6. at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:134)
  7. at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
  8. at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  9. at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  10. at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
  11. at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  12. at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  13. at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  14. at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  15. at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  16. at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  17. at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
  18. at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
  19. at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
  20. at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
  21. at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
  22. at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
  23. at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
  24. at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:696)
  25. at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:305)
  26. at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:291)
  27. at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:249)
  28. at HudiEntry$.main(HudiEntry.scala:44)
  29. at HudiEntry.main(HudiEntry.scala)

Caused by: java.sql.SQLException: org.apache.spark.sql.catalyst.parser.ParseException:

Operation not allowed: ALTER TABLE REPLACE COLUMNS(line 1, pos 0)

== SQL ==

ALTER TABLE luna_hudi.luna_ods_cow REPLACE COLUMNS(_hoodie_commit_time string, _hoodie_commit_seqno string, _hoodie_record_key string, _hoodie_partition_path string, _hoodie_file_name string, id int, name string, dt string, ts bigint )

^^^

  1. at org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:296)
  2. at org.apache.hudi.hive.HoodieHiveClient.updateHiveSQL(HoodieHiveClient.java:367)
  3. ... 35 more

官方提供的同步Hive元数据工具

https://hudi.apache.org/docs/writing_data.html#syncing-to-hive

三、PrestoDB使用

presto hive.properties配置即可读Hudi表。

  1. connector.name=hive-hadoop2
  2. hive.metastore.uri=thrift://foo:9083
  3. hive.config.resources=/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml
  4. hive.parquet.use-column-names=true

hive.parquet.use-column-names=true

https://www.google.com/search?q=presto+The+column+of+table+is+declared+as+type+bigint%2C+but+the+Parquet+file+(declares+the+column+as+type+BINARY&oq=presto+The+column+of+table+is+declared+as+type+bigint%2C+but+the+Parquet+file+(declares+the+column+as+type+BINARY&aqs=chrome..69i57j35i39l2j0i12j0l2j0i12l2j0l2.21954j0j7&sourceid=chrome&ie=UTF-8
https://stackoverflow.com/questions/60183579/presto-fails-with-type-mismatch-errors

四、Flink使用

Flink现在不能实时读Hudi。官方只是提供了个读Kafka写Hudi的工具Jar包。其他额外功能要自己开源码写代码实现,比如FlinkSQL的支持就需要自己实现。
https://mp.weixin.qq.com/s/d1GI1AYHUpKwz_VHd41CeA

数据迁移

历史的数据文件不能拿来直接用,一定要做一次有计算代价的转化。

新的Hive分区被Hudi掌管

全量数据迁移至Hudi

参考资料

https://hudi.apache.org/docs/migration_guide.html

增量实时拉取