Hive Hook & Bridge

1. Hive 模型

Hive模型包含以下类型(types):

  • Entity types:
    • hive_db
      • super-types: Asset
      • attributes: qualifiedName, name, description, owner, clusterName, location, parameters, ownerName
    • hive_table
      • super-types: DataSet
      • attributes: qualifiedName, name, description, owner, db, createTime, lastAccessTime, comment, retention, sd, partitionKeys, columns, aliases, parameters, viewOriginalText, viewExpandedText, tableType, temporary
    • hive_column
      • super-types: DataSet
      • attributes: qualifiedName, name, description, owner, type, comment, table
    • hive_storagedesc
      • super-types: Referenceable
      • attributes: qualifiedName, table, location, inputFormat, outputFormat, compressed, numBuckets, serdeInfo, bucketCols, sortCols, parameters, storedAsSubDirectories
    • hive_process
      • super-types: Process
      • attributes: qualifiedName, name, description, owner, inputs, outputs, startTime, endTime, userName, operationType, queryText, queryPlan, queryId, clusterName
    • hive_column_lineage
      • super-types: Process
      • attributes: qualifiedName, name, description, owner, inputs, outputs, query, depenendencyType, expression
  • Enum types:
    • hive_principal_type
      • values: USER, ROLE, GROUP
  • Struct types:
    • hive_order
      • attributes: col, order
    • hive_serde
      • attributes: name, serializationLib, parameters

使用唯一属性qualifiedName在Atlas中创建和删除Hive实体,其值应格式如下所述。请注意,dbName,tableName和columnName应为小写。

  1. hive_db.qualifiedName: <dbName>@<clusterName>
  2. hive_table.qualifiedName: <dbName>.<tableName>@<clusterName>
  3. hive_column.qualifiedName: <dbName>.<tableName>.<columnName>@<clusterName>
  4. hive_process.queryString: trimmed query string in lower case

2. Hive Hook

Atlas Hive hook向Hive注册以监听创建/更新/删除操作,并通过Kafka通知更新Atlas中的元数据,以了解Hive中的更改。按照以下说明在Hive中设置Atlas hook:

  • 通过添加以下内容在hive-site.xml中设置Atlas hook:
  1. <property>
  2. <name>hive.exec.post.hooks</name>
  3. <value>org.apache.atlas.hive.hook.HiveHook</value>
  4. </property>
  • 解压apache-atlas-${project.version}-hive-hook.tar.gz
  • 将文件夹apache-atlas-hive-hook-${project.version}/hook/hive的全部内容复制到<atlas package>/hook/hive
  • 在hive配置的hive-env.sh中添加’export HIVE_AUX_JARS_PATH=/hook/hive’
  • `<atlas-conf>/atlas-application.properties复制到hive conf目录。

atlas-application.properties中的以下属性控制线程池和通知详细信息:

  1. atlas.hook.hive.synchronous=false # whether to run the hook synchronously. false recommended to avoid delays in Hive query completion. Default: false
  2. atlas.hook.hive.numRetries=3 # number of retries for notification failure. Default: 3
  3. atlas.hook.hive.queueSize=10000 # queue size for the threadpool. Default: 10000
  4. atlas.cluster.name=primary # clusterName to use in qualifiedName of entities. Default: primary
  5. atlas.kafka.zookeeper.connect= # Zookeeper connect URL for Kafka. Example: localhost:2181
  6. atlas.kafka.zookeeper.connection.timeout.ms=30000 # Zookeeper connection timeout. Default: 30000
  7. atlas.kafka.zookeeper.session.timeout.ms=60000 # Zookeeper session timeout. Default: 60000
  8. atlas.kafka.zookeeper.sync.time.ms=20 # Zookeeper sync time. Default: 20

可以通过在配置名称前加上“atlas.kafka”前缀来指定Kafka通知生成器的其他配置。有关Kafka制作人支持的配置列表,请参阅Kafka Producer Configs

3. 列级血缘

从0.8-incubing版本的Atlas开始,在Atlas支持获取列级血缘。以下是详细信息:

3.1 模型

  • ColumnLineageProcess类型是Process的子类型
  • 这将输出列与一组输入列或输入表相关联
  • 血缘信息也捕获了依赖性,如下所示:
    • SIMPLE:输出列与输入值相同
    • EXPRESSION:输出列在输入列的运行时(例如,Hive SQL表达式)由某个表达式转换。
    • SCRIPT:输出列由用户提供的脚本转换。
  • 在EXPRESSION依赖的情况下,expression属性包含字符串形式的表达式
  • 由于Process链接输入和输出DataSet,因此Column是DataSet的子类型

3.2 示例

下面是一个简单的CTAS示例:

  1. create table t2 as select id, name from T1

捕获到的血缘如下: column_lineage_ex1

3.3 从Hive命令中提取Lineage

  • HiveHook将HookContext中的LineageInfo映射到Column lineage实例
  • Hive中的LineageInfo为最终的FileSinkOperator提供列级别的血缘,将它们链接到Hive查询中的输入列

4. 注意

  • 在将HIVE-13112的修补程序应用于Hive源之后,列级别谱系与Hive版本1.2.1一起使用
  • 由于数据库名称,表名称和列名称在配置单元中不区分大小写,因此实体中的相应名称是小写的。因此,在查询实体名称时,任何搜索API都应使用小写
  • 当前由hive hook捕获以下配置单元操作
    • create database
    • create table/view, create table as select
    • load, import, export
    • DMLs(insert)
    • alter database
    • alter table(倾斜的表信息,存储为,不支持保护)
    • alter view

5. 导入Hive元数据

Apache Atlas提供了一个命令行脚本import-hive.sh,用于将Apache Hive数据库和表的元数据导入Apache Atlas。该脚本可用于使用Apache Hive中的数据库/表初始化Apache Atlas。此脚本支持导入特定表的元数据,特定数据库中的表或所有数据库和表。

  1. Usage 1: <atlas package>/hook-bin/import-hive.sh
  2. Usage 2: <atlas package>/hook-bin/import-hive.sh [-d <database regex> OR --database <database regex>] [-t <table regex> OR --table <table regex>]
  3. Usage 3: <atlas package>/hook-bin/import-hive.sh [-f <filename>]
  4. File Format:
  5. database1:tbl1
  6. database1:tbl2
  7. database2:tbl1