一、整体流程

image.png

二、ExecuteGroovyScript

功能描述:用户输入库名列表(以逗号分隔),此脚本拆分库名并以一个库名为一个flowfile为单位传出。
输入属性:DBList
输出属性:DBName
输出内容:被拆分的数据库名(flowfile)

  1. import java.nio.charset.StandardCharsets
  2. // 获取以逗号拆分的库名字符串
  3. String DBList = DBList
  4. // 拆分库名
  5. String[] data = DBList.split(",")
  6. // 获取数据长度
  7. int length = data.length;
  8. if (length==0){
  9. return
  10. }
  11. // 循环所有库
  12. for (Integer i = 0; i < length; i++) {
  13. if (data[i]==""){
  14. session.transfer(flowFile,REL_FAILURE)
  15. continue
  16. }
  17. flowFile = session.create()
  18. // 添加属性
  19. flowFile = session.putAttribute(flowFile, 'DBName', data[i])
  20. // 写入flowFile中
  21. session.write(flowFile, {outputStream ->
  22. outputStream.write(data[i].toString().getBytes(StandardCharsets.UTF_8))
  23. } as OutputStreamCallback)
  24. // flowFile --> success
  25. session.transfer(flowFile, REL_SUCCESS)
  26. }

配置详情:
image.png

三、SelectHiveQL

功能描述:通过上一个process传入的DBName获取库下面的所有表名,以CSV格式不携表头不带引号传出
输入属性:
输出属性:DBName
输出内容:表名集合(flowfile)
配置详情:
image.png

1、HiveConnectionPool

功能描述:配置jdbc url
配置详情:
image.png

四、ExecuteGroovyScript

功能描述:通过上面获取到的库名加表名拆分组装json传出
输入属性:
输出属性:TBName、DBName
输出内容:TBName+DBName的json串(flowfile)

  1. // 导入相关类
  2. import org.apache.commons.io.IOUtils
  3. import java.nio.charset.StandardCharsets
  4. // 获取流文件
  5. flowFile = session.get()
  6. // 读取文件内容
  7. def text = ''
  8. session.read(flowFile, {
  9. inputStream ->
  10. text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
  11. } as InputStreamCallback
  12. )
  13. // 获取DBName
  14. def DBName = flowFile.getAttribute('DBName')
  15. if (text == '') {
  16. session.transfer(flowFile, REL_FAILURE)
  17. return
  18. }
  19. // 拆分表名
  20. String[] data = text.split("\n")
  21. // 获取数据长度
  22. int length = data.length;
  23. // 循环所有库
  24. for (Integer i = 0; i < length; i++) {
  25. newFlowFile = session.create()
  26. // 添加属性
  27. newFlowFile = session.putAttribute(newFlowFile, 'DBName', DBName)
  28. // 添加属性
  29. newFlowFile = session.putAttribute(newFlowFile, 'TBName', data[i])
  30. // 声明容器存储结果
  31. StringBuffer stringBuffer = new StringBuffer()
  32. stringBuffer.append("{\"DBName\":\""+ DBName + "\",\"TBNaem\":\"" + data[i] + "\"}")
  33. // 写入flowFile中
  34. session.write(newFlowFile, {outputStream ->
  35. outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))
  36. } as OutputStreamCallback)
  37. // flowFile --> success
  38. session.transfer(newFlowFile, REL_SUCCESS)
  39. }
  40. // flowFile --> success
  41. session.transfer(flowFile, REL_FAILURE)

配置详情:
image.png

五、SelectHiveQL

功能描述:通过获取到的库名加表名查询数据,以Avro格式输出。
输入属性:
输出属性:
输出内容:flowfile
配置详情:
image.png

六、PutHiveStreaming

功能描述:解析SelectHiveQL输出的数据,存储数据到指定表。该处理器用于向hive表写 数据,数据要求是avro格式,要求使用者熟练使用hive。通过 thrift nifi连hive的问题有点复杂,Apache版NIFI对应的Apache版hive,HDP版NIFI对应的HDP版hive。连接HDP版hive时NIFI运行环境需配置hive HDFS的相关hosts,并且运行NIFI 的用户拥有hive表的读写权限。此处理器hive支持的版本为1.2.1,不支持hive2.x,hive3.x则使用别的处理器。
输入属性:
输出属性:
输出内容:flowfile
配置详情:
image.png
示例说明:
1、从数据库读取数据写入hive表(无分区),Apache NIFI 1.8 - Apache hive 1.2.1
建表语句:

  1. CREATE TABLE test_03(table_name string)
  2. CLUSTERED BY (table_name) INTO 5 BUCKETS
  3. STORED AS ORC
  4. TBLPROPERTIES('transactional'='true');

2、hive表只能是ORC格式。
3、默认情况下(1.2及以上版本)建表使用SQL2011关键字会报错,如果弃用保留关键字,还需另做配置。建表时必须指明transactional = “true”,建表时需”clustered by (colName) into (n) buckets”。详情请查看hive streaming 官方文档(https://cwiki.apache.org/confluence/display/Hive/Streaming+Data+Ingest

七、问题

1、FAILED:SemanticException

image.png
FAILED:SemanticException [错误10265]:在具有非ACID事务管理器的ACID表test_db.test_table上不允许此命令。命令失败:null
4.1、必須将Hive事务管理器设置为org.apache.hadoop.hive.ql.lockmgr.DbTxnManager才能使用ACID表。

  1. SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;

Additionally, Set these properties to turn on transaction support
4.2、此外,设置这些属性,以打开事务支持
Client Side

  1. SET hive.support.concurrency=true;
  2. SET hive.enforce.bucketing=true;
  3. SET hive.exec.dynamic.partition.mode=nonstrict;

Server Side (Metastore)
服务器端(Metastore)

  1. SET hive.compactor.initiator.on=true;
  2. SET hive.compactor.worker.threads=1;

参考:
https://www.itdaan.com/tw/787e68af21a78a511ccd5f3395fc79c6
https://blog.csdn.net/qq_25794453/article/details/78412338

2、UnknownHostException:nameservice1

配置/etc/hosts
image.png

3、PutHiveStreaming 报错:

  1. org.apache.nifi.util.hive.HiveWriterSConnectFailure:Failure connecting EndPoint ({metaStoreUri = ' thrift://10.65.80.2:9083',database='guohua',table="dwd_mm_gh_org_department_b", partitionVals=[2018]}

原因:目标段hive版本为1.1,nifi端hive依赖为1.2。版本不兼容

CDH本地有hive1.1的包和1.2的包,分别依赖nifi-hive-nar.jar(hive1.2)、nifi-hive-1_1-nar.jar(hive1.1),但是观察PutHiveStraming,使用的依赖nar为hive1.2的。
image.png
解决方案:
1、到${NIFI}\lib目录copy hive1.1和hive1.2的包

  1. cd /opt/cloudera/parcels/CFM/NIFI/lib/
  2. mkdir bak
  3. cp nifi-hive* bak/
  4. cd bak/
  5. ll

image.png
2、解压两个包分别为META-INF(2版本)、META-INF_1(1版本)

  1. unzip nifi-hive_1_1-nar-1.9.0.1.0.1.0-12.nar
  2. mv META-INF META-INF_1
  3. unzip nifi-hive-nar-1.9.0.1.0.1.0-12.nar
  4. ll

image.png
image.png
3、新建两个目录移动jar包

  1. mkdir jar1 jar2
  2. ll

image.png
4、移动nar1和2的hive-.jar 和hadoop-mapreduce-client-.jar 分别到jar1和jar2
image.png
5、额外移动htrace-*.jar
image.png
6、查看两个jar
image.png
7、把jar1的jar包移动到META-INF(hive2)

  1. cp jar1/* META-INF/bundled-dependencies/

8、压缩META-INFO

  1. -- 压缩之前先备份nifi-hive-narhive2
  2. mv nifi-hive-nar-1.9.0.1.0.1.0-12.nar nifi-hive-nar-1.9.0.1.0.1.0-12.nar.bak
  3. zip -r -q nifi-hive-nar-1.9.0.1.0.1.0-12.nar META-INF

9、移动替换后到jar包到${NIFI_HOME}\lib,覆盖原来hive2依赖的nar

  1. mv nifi-hive-nar-1.9.0.1.0.1.0-12.nar /opt/cloudera/parcels/CFM/NIFI/lib/

image.png
10、重启nifi,问题解决。