概述
通识
一、Spark使用
目前只支持DataFrame API
val df = Seq(
(1, "foo", "2020/04/04", System.currentTimeMillis()),
(2, "bar Value", "2020/04/04", System.currentTimeMillis())
).toDF("id", "name", "dt", "ts")
df.write.format("hudi")
.options(getQuickstartWriteConfigs)
.option(RECORDKEY_FIELD_OPT_KEY, "id")
.option(PARTITIONPATH_FIELD_OPT_KEY, "dt")
.option(TABLE_NAME, tableName)
.option(HIVE_SYNC_ENABLED_OPT_KEY, true)
.option(HIVE_DATABASE_OPT_KEY, "luna_hudi")
.option(HIVE_TABLE_OPT_KEY, tableName)
.option(HIVE_URL_OPT_KEY, "jdbc:hive2://foo:10000")
.option(HIVE_USER_OPT_KEY, "admin")
.option(HIVE_PARTITION_FIELDS_OPT_KEY, "dt")
.mode(SaveMode.Overwrite)
.save(basePath)
Spark依赖
<!-- https://mvnrepository.com/artifact/org.apache.hudi/hudi-spark-bundle -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark-bundle_2.11</artifactId>
<version>0.7.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-avro -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>2.4.6</version>
</dependency>
Spark配置
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.sql.hive.convertMetastoreParquet=false
同步Hive配置项
同步Hive原理是创建一张外部表。
HIVE_SYNC_ENABLED_OPT_KEY=true
HIVE_DATABASE_OPT_KEY=luna_hudi
HIVE_TABLE_OPT_KEY=luna_ods
HIVE_URL_OPT_KEY=jdbc:hive2://foo:10000
二、与Hive对接
Spark开启Hive自动同步选项,会直接向HiveServer2创建外部表。
外部表
要把hudi-hadoop-mr-bundle放在目录hive的lib或者Spark ThriftServer的jars目录其中HiveServer2/ThriftServer。
https://mvnrepository.com/artifact/org.apache.hudi/hudi-hadoop-mr-bundle/0.7.0
Spark ThriftServer不支持 ALTER TABLE
org.apache.hudi.hive.HoodieHiveSyncException: Failed in executing SQL ALTER TABLE
luna_hudi
.luna_ods_cow
REPLACE COLUMNS(_hoodie_commit_time
string,_hoodie_commit_seqno
string,_hoodie_record_key
string,_hoodie_partition_path
string,_hoodie_file_name
string,id
int,name
string,dt
string,ts
bigint ) at org.apache.hudi.hive.HoodieHiveClient.updateHiveSQL(HoodieHiveClient.java:369) at org.apache.hudi.hive.HoodieHiveClient.updateTableDefinition(HoodieHiveClient.java:251) at org.apache.hudi.hive.HiveSyncTool.syncSchema(HiveSyncTool.java:188) at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:136) at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:94) at org.apache.hudi.HoodieSparkSqlWriter$.org$apache$hudi$HoodieSparkSqlWriter$$syncHive(HoodieSparkSqlWriter.scala:355) at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$metaSync$2.apply(HoodieSparkSqlWriter.scala:403)
at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$metaSync$2.apply(HoodieSparkSqlWriter.scala:399)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
at org.apache.hudi.HoodieSparkSqlWriter$.metaSync(HoodieSparkSqlWriter.scala:399)
at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:460)
at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:217)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:134)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:696)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:305)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:291)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:249)
at HudiEntry$.main(HudiEntry.scala:44)
at HudiEntry.main(HudiEntry.scala)
Caused by: java.sql.SQLException: org.apache.spark.sql.catalyst.parser.ParseException:
Operation not allowed: ALTER TABLE REPLACE COLUMNS(line 1, pos 0)
== SQL ==
ALTER TABLE
luna_hudi
.luna_ods_cow
REPLACE COLUMNS(_hoodie_commit_time
string,_hoodie_commit_seqno
string,_hoodie_record_key
string,_hoodie_partition_path
string,_hoodie_file_name
string,id
int,name
string,dt
string,ts
bigint )^^^
at org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:296)
at org.apache.hudi.hive.HoodieHiveClient.updateHiveSQL(HoodieHiveClient.java:367)
... 35 more
官方提供的同步Hive元数据工具
https://hudi.apache.org/docs/writing_data.html#syncing-to-hive
三、PrestoDB使用
presto hive.properties配置即可读Hudi表。
connector.name=hive-hadoop2
hive.metastore.uri=thrift://foo:9083
hive.config.resources=/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml
hive.parquet.use-column-names=true
hive.parquet.use-column-names=true
https://www.google.com/search?q=presto+The+column+of+table+is+declared+as+type+bigint%2C+but+the+Parquet+file+(declares+the+column+as+type+BINARY&oq=presto+The+column+of+table+is+declared+as+type+bigint%2C+but+the+Parquet+file+(declares+the+column+as+type+BINARY&aqs=chrome..69i57j35i39l2j0i12j0l2j0i12l2j0l2.21954j0j7&sourceid=chrome&ie=UTF-8
https://stackoverflow.com/questions/60183579/presto-fails-with-type-mismatch-errors
四、Flink使用
Flink现在不能实时读Hudi。官方只是提供了个读Kafka写Hudi的工具Jar包。其他额外功能要自己开源码写代码实现,比如FlinkSQL的支持就需要自己实现。
https://mp.weixin.qq.com/s/d1GI1AYHUpKwz_VHd41CeA
数据迁移
历史的数据文件不能拿来直接用,一定要做一次有计算代价的转化。
新的Hive分区被Hudi掌管
全量数据迁移至Hudi
参考资料
https://hudi.apache.org/docs/migration_guide.html