HBase To HBase
一、源端
1、详细流程结构
1.1 外层
1.2 内层
2、Processor
2.1 ExecuteGroovyScript(1)
a) SETTINGS
b) SCHEDULING
c) PROPERTIES
d) 脚本内容见 附1
2.2 ScanHBase
a) SETTINGS
b) SCHEDULING
c) PROPERTIES
2.3 ExecuteGroovyScript(2)
a) SETTINGS
b) SCHEDULING
c) PROPERTIES
2.4 HBaseClinetService
二、目标端
1、详细流程结构
1.1 外层
1.2 内层
2、Processor
2.1 NiFi Flow
a) SETTINGS
b) SCHEDULING
c) PROPERTIES
2.2 PutHBaseJson
a) SETTINGS
b) SCHEDULING
c) PROPERTIES
三、附页
附1:
import java.nio.charset.StandardCharsets
// 获取以逗号拆分的库名字符串
String TBList = TBList
// 拆分库名
String[] data = TBList.split(“,”)
// 获取数据长度
int length = data.length;
if (length==0){
return
}
// 循环所有库
for (Integer i = 0; i < length; i++) {
if (data[i]==””){
session.transfer(flowFile,REL_FAILURE)
continue
}
flowFile = session.create()
// 添加属性
flowFile = session.putAttribute(flowFile, ‘TBName’, data[i])
// 写入flowFile中
session.write(flowFile, {outputStream ->
outputStream.write(data[i].toString().getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)
// flowFile —> success
session.transfer(flowFile, REL_SUCCESS)
}
附2:
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
// 批量获取流文件
flowFileList=session.get(150)
if (!flowFileList)return
flowFileList.each {
flowFile ->
if (!flowFile) {return}
String content = ‘’
session.read(flowFile, {inputStream ->
content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
} as InputStreamCallback)
def data = new JsonSlurper().parseText(content)
data.each{
list->
// 获取rowkey
def row = list.row
list.cells.each{<br /> cell -><br /> // 声明容器存储结果<br /> StringBuffer stringBuffer = new StringBuffer()<br /> newFlowFile = session.create()<br /> // 添加属性,CF(列簇名)<br /> session.putAttribute(newFlowFile, 'CF', cell.fam)<br /> // 传递属性,TBName(HBase的表名)<br /> session.putAttribute(newFlowFile,'TBName',flowFile.getAttribute('TBName'))<br /> // 拼接拆分的数据为json<br /> stringBuffer.append("{\"row\":\"" + row + "\",\"" + cell.qual + "\":\"" + cell.val + "\"}" )<br /> // 写入newFlowFile中<br /> session.write(newFlowFile, {outputStream -><br /> outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))<br /> } as OutputStreamCallback)<br /> // newFlowFile --> success<br /> session.transfer(newFlowFile, REL_SUCCESS)<br /> } <br /> }<br />}
// flowFile —> failure
session.transfer(flowFileList,REL_FAILURE)