一、准备工作
1、环境
NIFI:1.9.0
HBase:2.1.1
2、创建HBase测试表
-- 建表1
create 'USER', 'info', 'data'
-- 建表2
create 'USER', {NAME => 'info', VERSIONS => '3'}, {NAME => 'data', VERSIONS => '3'}
create 'USER_02', {NAME => 'info', VERSIONS => '3'}, {NAME => 'data', VERSIONS => '3'}
-- 向user表中插入信息,row key为rk0001,列族info中添加name列标示符,值为zhangsan
put 'USER', 'rk0001', 'info:name', 'zhangsan'
put 'USER', 'rk0001', 'info:gender', 'female'
put 'USER', 'rk0001', 'info:age', 20
put 'USER', 'rk0001', 'data:pic', 'picture'
put 'USER', 'rk0002', 'info:name', 'lisi'
put 'USER', 'rk0002', 'info:gender', 'female'
put 'USER', 'rk0002', 'info:age', 18
put 'USER', 'rk0002', 'data:pic', 'picture'
put 'USER', 'rk0003', 'info:name', 'wangwu'
put 'USER', 'rk0003', 'info:gender', 'female'
put 'USER', 'rk0003', 'info:age', 18
put 'USER', 'rk0003', 'data:pic', 'picture'
-- 获取user表中row key为rk0001的所有信息
get 'USER', 'rk0001'
-- 查询user表中的所有信息
scan 'USER'
-- 清空user表中的数据
truncate 'USER'
二、整体流程预览
三、ExecuteGroovyScript
1、配置详情
2、脚本
import java.nio.charset.StandardCharsets
// 获取以逗号拆分的库名字符串
String TBList = TBList
// 拆分库名
String[] data = TBList.split(",")
// 获取数据长度
int length = data.length;
if (length==0){
return
}
// 循环所有库
for (Integer i = 0; i < length; i++) {
if (data[i]==""){
session.transfer(flowFile,REL_FAILURE)
continue
}
flowFile = session.create()
// 添加属性
flowFile = session.putAttribute(flowFile, 'TBName', data[i])
// 写入flowFile中
session.write(flowFile, {outputStream ->
outputStream.write(data[i].toString().getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)
// flowFile --> success
session.transfer(flowFile, REL_SUCCESS)
}
四、ScanHBase
1、配置详情
2、获取到json格式
[{
"row": "rk0001",
"cells": [{
"fam": "data",
"qual": "pic",
"val": "picture",
"ts": 1636511060633
}, {
"fam": "info",
"qual": "age",
"val": "20",
"ts": 1636511059579
}, {
"fam": "info",
"qual": "gender",
"val": "female",
"ts": 1636511059540
}, {
"fam": "info",
"qual": "name",
"val": "zhangsan",
"ts": 1636511059469
}]
}]
五、HBase_2_ClientService
1、配置详情
六、ExecuteGroovyScript
1、配置详情
2、脚本
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
// 批量获取流文件
flowFileList=session.get(150)
if (!flowFileList)return
flowFileList.each {
flowFile ->
if (!flowFile) {return}
String content = ''
session.read(flowFile, {inputStream ->
content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
} as InputStreamCallback)
def data = new JsonSlurper().parseText(content)
data.each{
list->
// 获取rowkey
def row = list.row
list.cells.each{
cell ->
// 声明容器存储结果
StringBuffer stringBuffer = new StringBuffer()
newFlowFile = session.create()
// 添加属性,CF(列簇名)
session.putAttribute(newFlowFile, 'CF', cell.fam)
// 传递属性,TBName(HBase的表名)
session.putAttribute(newFlowFile,'TBName',flowFile.getAttribute('TBName'))
// 拼接拆分的数据为json
stringBuffer.append("{\"row\":\"" + row + "\",\"" + cell.qual + "\":\"" + cell.val + "\"}" )
// 写入newFlowFile中
session.write(newFlowFile, {outputStream ->
outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)
// newFlowFile --> success
session.transfer(newFlowFile, REL_SUCCESS)
}
}
}
// flowFile --> failure
session.transfer(flowFileList,REL_FAILURE)
七、PutHBaseJson
1、配置详情
八、问题
ScanHBase报错:
14:56:00 CST ERROR
ScanHBase[id-0dbd7ef0-017d-1 00082a0-c67b6329ebff] Unable to fetch rows
from HBase table HBASE DEV PI B 2016 due to Cannot invoke method public
abstract void
org.apache.nifi.hbase.HBaseClientService.scan(java.lang.String,java.lang.Stringja
a.lang.String,java.lang.Stringjava.lang.L ong,java.lang.L ong,java.lang.Integerjava.la
ng.Boolean,java.util.Collection,java.util.List,org.apache.nifi.hbase.scan.ResultHandl
er) throws java.io.IOException on Controller Service with identifier 0dc740db- 017d-
1Ó00-cfbb-0c8bf1119ac0 because the Controller Service's State is currently
ENABL ING: org.apache.nifi.controller.service.ControllerServiceDisabledException:
Cannot invoke method public abstract void
org.apache.nifi.hbase.HBaseClientService.scan(java.lang.String,java.lang.String,jav
a.lang.String,java.lang.String,java.lang.Long,java.lang.L ong,java.lang.Integer,java.la
ng.Boolean,java.util.Collection,java.util.List,org apache.nifi.hbase.scan.ResultHandl
er) throws java.io.IOException on Controller Service with identifier 0dc740db-017d-
1Ó00-cfbb-0c8bf1119ac0 because the Controller Service's State is currently
ENABL ING
问题定位:
1、出现如上问题可能原因为:Zookeeper ZNode Parent 的路径不对。
2、正确Zookeeper ZNode Parent路径为HBase Master UI上Zookeeper Base Path