从概念上讲,Hudi 在 DFS 上物理存储一次数据,同时提供 3 种不同的查询方式。

当表同步到 Hive metastore 后,它会提供由 Hudi 的自定义输入格式支持的外部 Hive 表。 一旦提供了适当的 hudi 包,就可以通过流行的查询引擎(如 Hive、Spark SQL、Spark Datasource API 和 PrestoDB)来查询该表。

Spark Datasource

Spark Datasource API 是创建 Spark ETL 管道的一种流行方式。 Hudi 表可以通过 Spark datasource 使用简单的 spark.read.parquet 进行查询。 有关 Spark datasource 读取查询的更多示例,请参阅 Spark 快速入门

要设置 Spark 以查询 Hudi,请参阅查询引擎设置页面。

快照查询

检索当前时间点的数据表。

  1. val hudiIncQueryDF = spark
  2. .read()
  3. .format("hudi")
  4. .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL())
  5. .load(tablePath)

增量查询

spark pipeline 特别感兴趣的是 Hudi 支持增量查询的能力,如下所示。 示例增量查询将获取自 beginInstantTime 以来写入的所有记录,如下所示。 由于 Hudi 对记录级更改流的支持,这些增量管道通常通过仅处理更改的记录来提供比批处理高 10 倍的效率。

以下代码段显示了如何获取在 beginInstantTime 之后更改的所有记录并在它们上运行一些 SQL。

  1. Dataset<Row> hudiIncQueryDF = spark.read()
  2. .format("org.apache.hudi")
  3. .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
  4. .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(), <beginInstantTime>)
  5. .option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY(), "/year=2020/month=*/day=*") // Optional, use glob pattern if querying certain partitions
  6. .load(tablePath); // For incremental query, pass in the root/base path of table
  7. hudiIncQueryDF.createOrReplaceTempView("hudi_trips_incremental")
  8. spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()

示例,请参阅 Spark 快速入门中的增量查询。 请参阅配置部分,查看所有数据源选项。

此外,HoodieReadClient 使用 Hudi 的隐式索引提供以下功能。

API Description
read(keys) 将 key 对应的数据读出为 DataFrame,使用Hudi自带的索引,查找速度更快
filterExists() 从提供的 RDD[HoodieRecord] 中过滤掉已经存在的记录。 用于重复数据删除
checkExists(keys) 检查提供的键是否存在于 Hudi 表中

Spark SQL

将 Hudi 表注册到 Hive metastore 后,就可以使用 Spark-Hive 集成来查询它们。 默认情况下,Spark SQL 在从 Hive metastore parquet 表中读取时,将尝试使用自己的 parquet reader 而不是 Hive SerDe。 以下是查询 COW 或 MOR 表时要考虑的重要设置。

COW 表

对于 COW 表,Spark 的默认 parquet reader 可用于保留 Spark 内置优化,用于读取 Parquet 文件,例如在 Hudi Hive 表上进行矢量化读取。 如果使用默认 parquet reader,则需要将路径过滤器推送到 sparkContext 中,如下所示。

  1. spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class", classOf[org.apache.hudi.hadoop.HoodieROTablePathFilter], classOf[org.apache.hadoop.fs.PathFilter]);

MOR 表

使用 Hudi 0.9.0+ 版本查询 MOR 表不需要特殊配置。

如果使用 Hudi 版本 <= 0.8.0 查询 MOR 表,则需要通过设置:spark.sql.hive.convertMetastoreParquet=false 来关闭 SparkSQL 默认 parquet reader。

  1. $ spark-shell --driver-class-path /etc/hive/conf --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.3,org.apache.spark:spark-avro_2.11:2.4.4 --conf spark.sql.hive.convertMetastoreParquet=false
  2. scala> sqlContext.sql("select count(*) from hudi_trips_mor_rt where datestr = '2016-10-02'").show()
  3. scala> sqlContext.sql("select count(*) from hudi_trips_mor_rt where datestr = '2016-10-02'").show()

Flink SQL

将 Hudi 表注册到 Flink catalog 后,就可以使用 Flink SQL 查询 Hudi 表。 它支持两种 Hudi 表类型的所有查询类型,再次依赖于 Hudi 的自定义输入格式,如 Hive。 请按照 Flink 快速入门中的说明添加 hudi-flink-bundle。

默认情况下,Flink SQL 在读取 Hive metastore parquet 表时会尝试使用自己的 parquet reader 而不是 Hive SerDe。

  1. # HADOOP_HOME is your hadoop root directory after unpack the binary package.
  2. export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
  3. ./bin/sql-client.sh embedded -j .../hudi-flink-bundle_2.1?-*.*.*.jar shell
  1. -- this defines a COPY_ON_WRITE table named 't1'
  2. CREATE TABLE t1(
  3. uuid VARCHAR(20), -- you can use 'PRIMARY KEY NOT ENFORCED' syntax to specify the field as record key
  4. name VARCHAR(10),
  5. age INT,
  6. ts TIMESTAMP(3),
  7. `partition` VARCHAR(20)
  8. )
  9. PARTITIONED BY (`partition`)
  10. WITH (
  11. 'connector' = 'hudi',
  12. 'path' = 'table_base+path'
  13. );
  14. -- query the data
  15. select * from t1 where `partition` = 'par1';

Flink 内置的 parquet 用于 COW 和 MOR 表,如果在过滤器中指定了分区路径,则 Flink 引擎会在内部应用分区修剪。 尚不支持过滤器下推(已在 roadmap)。

对于 MOR 表,为了将 hudi 表作为流式查询,你需要添加选项 'read.streaming.enabled' = 'true',当查询表时,Flink 流式管道启动并且永远不会结束,直到用户手动取消作业 。 你可以使用选项 read.streaming.start-commit 指定开始提交,并使用选项 read.streaming.check-interval 指定源监视间隔。

Hive

要设置 Hive 以查询 Hudi,请参阅查询引擎设置页面。

HiveIncrementalPuller 允许通过 HiveQL 从大型事实/维度表中增量提取更改,结合了 Hive(可靠地处理复杂的 SQL 查询)和增量原语(以增量方式而不是完全扫描来加速查询表)的优点。 该工具使用 Hive JDBC 运行 hive 查询并将其结果保存在临时表中,这个表以后可以被插入更新。 Upsert 实用程序 (HoodieDeltaStreamer) 从目录结构中获取所需的所有状态,以了解目标表上的提交时间。 例如:/app/incremental-hql/intermediate/{source_table_name}_temp/{last_commit_included}。注册的 Delta Hive 表的格式为 {tmpdb}.{source_table}_{last_commit_included}

以下是 HiveIncrementalPuller 的配置选项

配置 描述 默认值
hiveUrl 要连接的Hive Server 2的URL
hiveUser Hive Server 2 用户名
hivePass Hive Server 2 密码
queue YARN 队列名称
tmp DFS 中存储临时增量数据的目录。目录结构将遵循约定。请参阅以下部分。
extractSQLFile 在源表上要执行的提取数据的SQL。提取的数据将是自特定时间点以来更改的所有行。
sourceTable 源表名称。需要在 Hive 环境属性中设置。
sourceDb 源数据库名称。 需要在 Hive 环境属性中设置。
targetTable 目标表名称。中间存储目录结构需要。
targetDb 目标表的数据库名称。
tmpdb 用来创建中间临时增量表的数据库 hoodie_temp
fromCommitTime 这是最重要的参数。 这是从中提取更改记录的时间点。
maxCommits 要包含在查询中的提交数。
将此设置为-1将包括从fromCommitTime 开始的所有提交。将此设置为大于 0 的值,将包括在 fromCommitTime 之后仅更改指定提交次数的记录。如果你需要一次赶上两次提交,则可能需要这样做。
3
help 实用程序帮助

设置 fromCommitTime=0 和 maxCommits=-1 将提取整个源数据集,可用于启动 Backfill。 如果目标表是 Hudi 表,则该实用程序可以确定目标表是否没有提交或延迟超过 24 小时(这是可配置的), 它将自动使用 Backfill 配置,因为增量应用最近 24 小时的更改会比 Backfill 花费更多的时间。 该工具当前的限制是不支持以混合模式(快照和增量模式)自连接同一张表。

关于使用 Fetch 任务执行的 Hive 增量查询的注意事项:由于 Fetch 任务对每个分区调用 InputFormat.listStatus(),因此每个 listStatus() 调用都会列出 Hoodie 元数据。 为了避免这种情况,使用 hive 会话属性对增量查询禁用 Fetch 任务可能很有用: set hive.fetch.task.conversion=none; 这将确保为 Hive 查询选择 Map Reduce 执行,该查询合并了分区(逗号分隔)并且只对所有这些分区调用一次 InputFormat.listStatus()。

PrestoDB

要设置 Trino 以查询 Hudi,请参阅查询引擎设置页面。

Trino

要设置 Trino 以查询 Hudi,请参阅查询引擎设置页面。

Impala (3.4 or later)

Impala 能够将 Hudi COW 表作为 HDFS 上的 EXTERNAL TABLE 进行查询。

在 Impala 上创建 Hudi 读取优化表:

  1. CREATE EXTERNAL TABLE database.table_name
  2. LIKE PARQUET '/path/to/load/xxx.parquet'
  3. STORED AS HUDIPARQUET
  4. LOCATION '/path/to/load';

Impala 能够利用物理分区结构来提高查询性能。 要创建分区表,文件夹应遵循命名约定,如 year=2020/month=1。 Impala 使用 = 分隔分区名称和分区值。
在 Impala 上创建分区 Hudi 读取优化表:

  1. CREATE EXTERNAL TABLE database.table_name
  2. LIKE PARQUET '/path/to/load/xxx.parquet'
  3. PARTITION BY (year int, month int, day int)
  4. STORED AS HUDIPARQUET
  5. LOCATION '/path/to/load';
  6. ALTER TABLE database.table_name RECOVER PARTITIONS;

Hudi 进行新的提交后,刷新 Impala 表以获取最新结果:

  1. REFRESH database.table_name

支持矩阵

写时复制表(COW)

查询引擎 快照查询 增量查询
Hive Y Y
Spark SQL Y Y
Spark Datasource Y Y
Flink SQL Y N
PrestoDB Y N
Trino Y N
Impala Y N

:::info 注意⚠️
读优化查询不适用于 COW 表。 :::

读时合并表(MOR)

查询引擎 快照查询 增量查询 读优化查询
Hive Y Y Y
Spark SQL Y Y Y
Spark Datasource Y Y Y
Flink SQL Y Y Y
PrestoDB Y N Y
Trino N N Y
Impala N N Y