国电电力数据同步传输管控平台
(HBaseToHBase流程部署)
目录
一、源端 3
1、详细流程结构 3
1.1 外层 3
1.2 内层 3
2、Processor 4
2.1 ExecuteGroovyScript(1) 4
2.2 ScanHBase 5
2.3 ExecuteGroovyScript(2) 7
2.4 HBaseClinetService 8
二、目标端 9
1、详细流程结构 9
1.1 外层 9
1.2 内层 9
2、Processor 10
2.1 NiFi Flow 10
2.2 PutHBaseJson 10
三、附页 12
附1: 12
附2: 13
一、源端
1、详细流程结构
1.1 外层
1.2 内层
2、Processor
2.1 ExecuteGroovyScript(1)
a) SETTINGS
b) SCHEDULING
c) PROPERTIES
2.2 ScanHBase
a) SETTINGS
b) SCHEDULING
c) PROPERTIES
2.3 ExecuteGroovyScript(2)
a) SETTINGS
b) SCHEDULING
c) PROPERTIES
2.4 HBaseClinetService
c) PROPERTIES
二、目标端
1、详细流程结构
1.1 外层
1.2 内层
2、Processor
2.1 NiFi Flow
a) SETTINGS
b) SCHEDULING
c) PROPERTIES
2.2 PutHBaseJson
a) SETTINGS
b) SCHEDULING
c) PROPERTIES
三、附页
附1:
import java.nio.charset.StandardCharsets
// 获取以逗号拆分的库名字符串
String TBList = TBList
// 拆分库名
String[] data = TBList.split(“,”)
// 获取数据长度
int length = data.length;
if (length==0){
return
}
// 循环所有库
for (Integer i = 0; i < length; i++) {
if (data[i]==””){
session.transfer(flowFile,REL_FAILURE)
continue
}
flowFile = session.create()
// 添加属性
flowFile = session.putAttribute(flowFile, ‘TBName’, data[i])
// 写入flowFile中
session.write(flowFile, {outputStream ->
outputStream.write(data[i].toString().getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)
// flowFile —> success
session.transfer(flowFile, REL_SUCCESS)
}
附2:
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
// 批量获取流文件
flowFileList=session.get(150)
if (!flowFileList)return
flowFileList.each {
flowFile ->
if (!flowFile) {return}
String content = ‘’
session.read(flowFile, {inputStream ->
content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
} as InputStreamCallback)
def data = new JsonSlurper().parseText(content)
data.each{
list->
// 获取rowkey
def row = list.row
list.cells.each{
cell ->
// 声明容器存储结果
StringBuffer stringBuffer = new StringBuffer()
newFlowFile = session.create()
// 添加属性,CF(列簇名)
session.putAttribute(newFlowFile, ‘CF’, cell.fam)
// 传递属性,TBName(HBase的表名)
session.putAttribute(newFlowFile,’TBName’,flowFile.getAttribute(‘TBName’))
// 拼接拆分的数据为json
stringBuffer.append(“{\”row\”:\”” + row + “\”,\”” + cell.qual + “\”:\”” + cell.val + “\”}” )
// 写入newFlowFile中
session.write(newFlowFile, {outputStream ->
outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)
// newFlowFile —> success
session.transfer(newFlowFile, REL_SUCCESS)
}
}
}
// flowFile —> failure
session.transfer(flowFileList,REL_FAILURE)