国电电力数据同步传输管控平台
(HBaseToHBase流程部署)
目录
一、源端 3
1、详细流程结构 3
1.1 外层 3
1.2 内层 3
2、Processor 4
2.1 ExecuteGroovyScript(1) 4
2.2 ScanHBase 5
2.3 ExecuteGroovyScript(2) 7
2.4 HBaseClinetService 8
二、目标端 9
1、详细流程结构 9
1.1 外层 9
1.2 内层 9
2、Processor 10
2.1 NiFi Flow 10
2.2 PutHBaseJson 10
三、附页 12
附1: 12
附2: 13

一、源端

1、详细流程结构

1.1 外层

05_国电电力数据同步传输管控平台-HBaseToHBase流程部署v1.0 - 图1

1.2 内层

05_国电电力数据同步传输管控平台-HBaseToHBase流程部署v1.0 - 图2

2、Processor

2.1 ExecuteGroovyScript(1)

a) SETTINGS

05_国电电力数据同步传输管控平台-HBaseToHBase流程部署v1.0 - 图3

b) SCHEDULING

05_国电电力数据同步传输管控平台-HBaseToHBase流程部署v1.0 - 图4

c) PROPERTIES

05_国电电力数据同步传输管控平台-HBaseToHBase流程部署v1.0 - 图5
d) 脚本内容见 附1

2.2 ScanHBase

a) SETTINGS

05_国电电力数据同步传输管控平台-HBaseToHBase流程部署v1.0 - 图6

b) SCHEDULING

05_国电电力数据同步传输管控平台-HBaseToHBase流程部署v1.0 - 图7

c) PROPERTIES

05_国电电力数据同步传输管控平台-HBaseToHBase流程部署v1.0 - 图8

2.3 ExecuteGroovyScript(2)

a) SETTINGS

05_国电电力数据同步传输管控平台-HBaseToHBase流程部署v1.0 - 图9

b) SCHEDULING

05_国电电力数据同步传输管控平台-HBaseToHBase流程部署v1.0 - 图10

c) PROPERTIES

05_国电电力数据同步传输管控平台-HBaseToHBase流程部署v1.0 - 图11

2.4 HBaseClinetService

c) PROPERTIES

05_国电电力数据同步传输管控平台-HBaseToHBase流程部署v1.0 - 图12
05_国电电力数据同步传输管控平台-HBaseToHBase流程部署v1.0 - 图13

二、目标端

1、详细流程结构

1.1 外层

05_国电电力数据同步传输管控平台-HBaseToHBase流程部署v1.0 - 图14

1.2 内层

05_国电电力数据同步传输管控平台-HBaseToHBase流程部署v1.0 - 图15

2、Processor

2.1 NiFi Flow

a) SETTINGS

b) SCHEDULING

c) PROPERTIES

05_国电电力数据同步传输管控平台-HBaseToHBase流程部署v1.0 - 图16

2.2 PutHBaseJson

a) SETTINGS

05_国电电力数据同步传输管控平台-HBaseToHBase流程部署v1.0 - 图17

b) SCHEDULING

05_国电电力数据同步传输管控平台-HBaseToHBase流程部署v1.0 - 图18

c) PROPERTIES

05_国电电力数据同步传输管控平台-HBaseToHBase流程部署v1.0 - 图19

三、附页

附1:

import java.nio.charset.StandardCharsets
// 获取以逗号拆分的库名字符串
String TBList = TBList
// 拆分库名
String[] data = TBList.split(“,”)
// 获取数据长度
int length = data.length;
if (length==0){
return
}
// 循环所有库
for (Integer i = 0; i < length; i++) {
if (data[i]==””){
session.transfer(flowFile,REL_FAILURE)
continue
}
flowFile = session.create()
// 添加属性
flowFile = session.putAttribute(flowFile, ‘TBName’, data[i])
// 写入flowFile中
session.write(flowFile, {outputStream ->
outputStream.write(data[i].toString().getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)
// flowFile —> success
session.transfer(flowFile, REL_SUCCESS)
}

附2:

import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
// 批量获取流文件
flowFileList=session.get(150)
if (!flowFileList)return
flowFileList.each {
flowFile ->
if (!flowFile) {return}
String content = ‘’
session.read(flowFile, {inputStream ->
content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
} as InputStreamCallback)
def data = new JsonSlurper().parseText(content)
data.each{
list->
// 获取rowkey
def row = list.row
list.cells.each{
cell ->
// 声明容器存储结果
StringBuffer stringBuffer = new StringBuffer()
newFlowFile = session.create()
// 添加属性,CF(列簇名)
session.putAttribute(newFlowFile, ‘CF’, cell.fam)
// 传递属性,TBName(HBase的表名)
session.putAttribute(newFlowFile,’TBName’,flowFile.getAttribute(‘TBName’))
// 拼接拆分的数据为json
stringBuffer.append(“{\”row\”:\”” + row + “\”,\”” + cell.qual + “\”:\”” + cell.val + “\”}” )
// 写入newFlowFile中
session.write(newFlowFile, {outputStream ->
outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)
// newFlowFile —> success
session.transfer(newFlowFile, REL_SUCCESS)
}
}
}
// flowFile —> failure
session.transfer(flowFileList,REL_FAILURE)