HBase To HBase

一、源端

1、详细流程结构

1.1 外层

image.png

1.2 内层

image.png

2、Processor

2.1 ExecuteGroovyScript(1)

a) SETTINGS
HBase To HBase夸集群传输详细流程 - 图3
b) SCHEDULING
HBase To HBase夸集群传输详细流程 - 图4
c) PROPERTIES
HBase To HBase夸集群传输详细流程 - 图5
d) 脚本内容见 附1

2.2 ScanHBase

a) SETTINGS
HBase To HBase夸集群传输详细流程 - 图6
b) SCHEDULING
HBase To HBase夸集群传输详细流程 - 图7
c) PROPERTIES
HBase To HBase夸集群传输详细流程 - 图8

2.3 ExecuteGroovyScript(2)

a) SETTINGS
HBase To HBase夸集群传输详细流程 - 图9
b) SCHEDULING
HBase To HBase夸集群传输详细流程 - 图10
c) PROPERTIES
HBase To HBase夸集群传输详细流程 - 图11

2.4 HBaseClinetService

c) PROPERTIES
HBase To HBase夸集群传输详细流程 - 图12
HBase To HBase夸集群传输详细流程 - 图13

二、目标端

1、详细流程结构

1.1 外层

HBase To HBase夸集群传输详细流程 - 图14

1.2 内层

image.png

2、Processor

2.1 NiFi Flow

a) SETTINGS
b) SCHEDULING
c) PROPERTIES
HBase To HBase夸集群传输详细流程 - 图16

2.2 PutHBaseJson

a) SETTINGS
HBase To HBase夸集群传输详细流程 - 图17
b) SCHEDULING
HBase To HBase夸集群传输详细流程 - 图18
c) PROPERTIES
HBase To HBase夸集群传输详细流程 - 图19

三、附页

附1:

import java.nio.charset.StandardCharsets

// 获取以逗号拆分的库名字符串
String TBList = TBList
// 拆分库名
String[] data = TBList.split(“,”)
// 获取数据长度
int length = data.length;
if (length==0){
return
}

// 循环所有库
for (Integer i = 0; i < length; i++) {
if (data[i]==””){
session.transfer(flowFile,REL_FAILURE)
continue
}
flowFile = session.create()

// 添加属性
flowFile = session.putAttribute(flowFile, ‘TBName’, data[i])

// 写入flowFile中
session.write(flowFile, {outputStream ->
outputStream.write(data[i].toString().getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)

// flowFile —> success
session.transfer(flowFile, REL_SUCCESS)
}

附2:

import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils

// 批量获取流文件
flowFileList=session.get(150)
if (!flowFileList)return

flowFileList.each {
flowFile ->
if (!flowFile) {return}
String content = ‘’
session.read(flowFile, {inputStream ->
content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
} as InputStreamCallback)
def data = new JsonSlurper().parseText(content)
data.each{
list->
// 获取rowkey
def row = list.row

  1. list.cells.each{<br /> cell -><br /> // 声明容器存储结果<br /> StringBuffer stringBuffer = new StringBuffer()<br /> newFlowFile = session.create()<br /> // 添加属性,CF(列簇名)<br /> session.putAttribute(newFlowFile, 'CF', cell.fam)<br /> // 传递属性,TBName(HBase的表名)<br /> session.putAttribute(newFlowFile,'TBName',flowFile.getAttribute('TBName'))<br /> // 拼接拆分的数据为json<br /> stringBuffer.append("{\"row\":\"" + row + "\",\"" + cell.qual + "\":\"" + cell.val + "\"}" )<br /> // 写入newFlowFile中<br /> session.write(newFlowFile, {outputStream -><br /> outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))<br /> } as OutputStreamCallback)<br /> // newFlowFile --> success<br /> session.transfer(newFlowFile, REL_SUCCESS)<br /> } <br /> }<br />}

// flowFile —> failure
session.transfer(flowFileList,REL_FAILURE)