一、准备工作
    1、环境
    NIFI:1.9.0
    HBase:2.1.1

    2、创建HBase测试表

    1. -- 建表1
    2. create 'USER', 'info', 'data'
    3. -- 建表2
    4. create 'USER', {NAME => 'info', VERSIONS => '3'}, {NAME => 'data', VERSIONS => '3'}
    5. create 'USER_02', {NAME => 'info', VERSIONS => '3'}, {NAME => 'data', VERSIONS => '3'}
    6. -- user表中插入信息,row keyrk0001,列族info中添加name列标示符,值为zhangsan
    7. put 'USER', 'rk0001', 'info:name', 'zhangsan'
    8. put 'USER', 'rk0001', 'info:gender', 'female'
    9. put 'USER', 'rk0001', 'info:age', 20
    10. put 'USER', 'rk0001', 'data:pic', 'picture'
    11. put 'USER', 'rk0002', 'info:name', 'lisi'
    12. put 'USER', 'rk0002', 'info:gender', 'female'
    13. put 'USER', 'rk0002', 'info:age', 18
    14. put 'USER', 'rk0002', 'data:pic', 'picture'
    15. put 'USER', 'rk0003', 'info:name', 'wangwu'
    16. put 'USER', 'rk0003', 'info:gender', 'female'
    17. put 'USER', 'rk0003', 'info:age', 18
    18. put 'USER', 'rk0003', 'data:pic', 'picture'
    19. -- 获取user表中row keyrk0001的所有信息
    20. get 'USER', 'rk0001'
    21. -- 查询user表中的所有信息
    22. scan 'USER'
    23. -- 清空user表中的数据
    24. truncate 'USER'

    二、整体流程预览
    image.png
    三、ExecuteGroovyScript
    1、配置详情
    image.png
    2、脚本

    1. import java.nio.charset.StandardCharsets
    2. // 获取以逗号拆分的库名字符串
    3. String TBList = TBList
    4. // 拆分库名
    5. String[] data = TBList.split(",")
    6. // 获取数据长度
    7. int length = data.length;
    8. if (length==0){
    9. return
    10. }
    11. // 循环所有库
    12. for (Integer i = 0; i < length; i++) {
    13. if (data[i]==""){
    14. session.transfer(flowFile,REL_FAILURE)
    15. continue
    16. }
    17. flowFile = session.create()
    18. // 添加属性
    19. flowFile = session.putAttribute(flowFile, 'TBName', data[i])
    20. // 写入flowFile中
    21. session.write(flowFile, {outputStream ->
    22. outputStream.write(data[i].toString().getBytes(StandardCharsets.UTF_8))
    23. } as OutputStreamCallback)
    24. // flowFile --> success
    25. session.transfer(flowFile, REL_SUCCESS)
    26. }

    四、ScanHBase
    1、配置详情
    image.png
    2、获取到json格式

    1. [{
    2. "row": "rk0001",
    3. "cells": [{
    4. "fam": "data",
    5. "qual": "pic",
    6. "val": "picture",
    7. "ts": 1636511060633
    8. }, {
    9. "fam": "info",
    10. "qual": "age",
    11. "val": "20",
    12. "ts": 1636511059579
    13. }, {
    14. "fam": "info",
    15. "qual": "gender",
    16. "val": "female",
    17. "ts": 1636511059540
    18. }, {
    19. "fam": "info",
    20. "qual": "name",
    21. "val": "zhangsan",
    22. "ts": 1636511059469
    23. }]
    24. }]

    五、HBase_2_ClientService
    1、配置详情
    image.png
    六、ExecuteGroovyScript
    1、配置详情
    image.png
    2、脚本

    1. import groovy.json.JsonSlurper
    2. import java.nio.charset.StandardCharsets
    3. import org.apache.commons.io.IOUtils
    4. // 批量获取流文件
    5. flowFileList=session.get(150)
    6. if (!flowFileList)return
    7. flowFileList.each {
    8. flowFile ->
    9. if (!flowFile) {return}
    10. String content = ''
    11. session.read(flowFile, {inputStream ->
    12. content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    13. } as InputStreamCallback)
    14. def data = new JsonSlurper().parseText(content)
    15. data.each{
    16. list->
    17. // 获取rowkey
    18. def row = list.row
    19. list.cells.each{
    20. cell ->
    21. // 声明容器存储结果
    22. StringBuffer stringBuffer = new StringBuffer()
    23. newFlowFile = session.create()
    24. // 添加属性,CF(列簇名)
    25. session.putAttribute(newFlowFile, 'CF', cell.fam)
    26. // 传递属性,TBName(HBase的表名)
    27. session.putAttribute(newFlowFile,'TBName',flowFile.getAttribute('TBName'))
    28. // 拼接拆分的数据为json
    29. stringBuffer.append("{\"row\":\"" + row + "\",\"" + cell.qual + "\":\"" + cell.val + "\"}" )
    30. // 写入newFlowFile中
    31. session.write(newFlowFile, {outputStream ->
    32. outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))
    33. } as OutputStreamCallback)
    34. // newFlowFile --> success
    35. session.transfer(newFlowFile, REL_SUCCESS)
    36. }
    37. }
    38. }
    39. // flowFile --> failure
    40. session.transfer(flowFileList,REL_FAILURE)

    七、PutHBaseJson
    1、配置详情
    image.png

    八、问题
    ScanHBase报错:

    1. 14:56:00 CST ERROR
    2. ScanHBase[id-0dbd7ef0-017d-1 00082a0-c67b6329ebff] Unable to fetch rows
    3. from HBase table HBASE DEV PI B 2016 due to Cannot invoke method public
    4. abstract void
    5. org.apache.nifi.hbase.HBaseClientService.scan(java.lang.String,java.lang.Stringja
    6. a.lang.String,java.lang.Stringjava.lang.L ong,java.lang.L ong,java.lang.Integerjava.la
    7. ng.Boolean,java.util.Collection,java.util.List,org.apache.nifi.hbase.scan.ResultHandl
    8. er) throws java.io.IOException on Controller Service with identifier 0dc740db- 017d-
    9. 1Ó00-cfbb-0c8bf1119ac0 because the Controller Service's State is currently
    10. ENABL ING: org.apache.nifi.controller.service.ControllerServiceDisabledException:
    11. Cannot invoke method public abstract void
    12. org.apache.nifi.hbase.HBaseClientService.scan(java.lang.String,java.lang.String,jav
    13. a.lang.String,java.lang.String,java.lang.Long,java.lang.L ong,java.lang.Integer,java.la
    14. ng.Boolean,java.util.Collection,java.util.List,org apache.nifi.hbase.scan.ResultHandl
    15. er) throws java.io.IOException on Controller Service with identifier 0dc740db-017d-
    16. 1Ó00-cfbb-0c8bf1119ac0 because the Controller Service's State is currently
    17. ENABL ING

    image.png

    问题定位:
    1、出现如上问题可能原因为:Zookeeper ZNode Parent 的路径不对。
    image.png
    2、正确Zookeeper ZNode Parent路径为HBase Master UI上Zookeeper Base Path
    image.png