国电电力数据同步传输管控平台
(HiveToHive流程部署)

目录
一、源端 2
1、结构展示 3
1.1 外层 3
1.2 内层 3
2、Processor 3
2.1 ExecuteGroovyScript(1) 3
2.2 SelectHiveQL(1) 5
2.3 ExecuteGroovyScript(2) 6
2.4 SelectHiveQL(2) 8
2.5 HiveConnectionPool、Hive_1_1ConnectionPool 9
二、目标端 10
1、结构展示 10
1.1 外层 10
1.2 内层 11
2、Processor 11
2.1 NiFi Flow 11
2.2 PutHiveStreaming 12
三、附页 14
附1: 14
附2: 15
附3: 17

一、源端

1、结构展示

1.1 外层

06_国电电力数据同步传输管控平台-HiveToHive流程部署_v1.0 - 图1

1.2 内层

06_国电电力数据同步传输管控平台-HiveToHive流程部署_v1.0 - 图2

2、Processor

2.1 ExecuteGroovyScript(1)

a) SETTINGS

06_国电电力数据同步传输管控平台-HiveToHive流程部署_v1.0 - 图3

b) SCHEDULING

06_国电电力数据同步传输管控平台-HiveToHive流程部署_v1.0 - 图4

c) PROPERTIES

06_国电电力数据同步传输管控平台-HiveToHive流程部署_v1.0 - 图5

2.2 SelectHiveQL(1)

a) SETTINGS

06_国电电力数据同步传输管控平台-HiveToHive流程部署_v1.0 - 图6

b) SCHEDULING

06_国电电力数据同步传输管控平台-HiveToHive流程部署_v1.0 - 图7

c) PROPERTIES

06_国电电力数据同步传输管控平台-HiveToHive流程部署_v1.0 - 图8

2.3 ExecuteGroovyScript(2)

a) SETTINGS

06_国电电力数据同步传输管控平台-HiveToHive流程部署_v1.0 - 图9

b) SCHEDULING

06_国电电力数据同步传输管控平台-HiveToHive流程部署_v1.0 - 图10

c) PROPERTIES

06_国电电力数据同步传输管控平台-HiveToHive流程部署_v1.0 - 图11

2.4 SelectHiveQL(2)

a) SETTINGS

06_国电电力数据同步传输管控平台-HiveToHive流程部署_v1.0 - 图12

b) SCHEDULING

06_国电电力数据同步传输管控平台-HiveToHive流程部署_v1.0 - 图13

c) PROPERTIES

06_国电电力数据同步传输管控平台-HiveToHive流程部署_v1.0 - 图14

2.5 HiveConnectionPool、Hive_1_1ConnectionPool

06_国电电力数据同步传输管控平台-HiveToHive流程部署_v1.0 - 图15
06_国电电力数据同步传输管控平台-HiveToHive流程部署_v1.0 - 图16

a) PROPERTIES

06_国电电力数据同步传输管控平台-HiveToHive流程部署_v1.0 - 图17
注:HiveConnectionPool和Hive_1_1ConnectionPool配置参数一样

二、目标端

1、结构展示

1.1 外层

06_国电电力数据同步传输管控平台-HiveToHive流程部署_v1.0 - 图18

1.2 内层

06_国电电力数据同步传输管控平台-HiveToHive流程部署_v1.0 - 图19

2、Processor

2.1 NiFi Flow

a) SETTINGS

b) SCHEDULING

c) PROPERTIES

06_国电电力数据同步传输管控平台-HiveToHive流程部署_v1.0 - 图20
06_国电电力数据同步传输管控平台-HiveToHive流程部署_v1.0 - 图21

2.2 PutHiveStreaming

a) SETTINGS

06_国电电力数据同步传输管控平台-HiveToHive流程部署_v1.0 - 图22

b) SCHEDULING

06_国电电力数据同步传输管控平台-HiveToHive流程部署_v1.0 - 图23

c) PROPERTIES

06_国电电力数据同步传输管控平台-HiveToHive流程部署_v1.0 - 图24

三、附页

附1:

import java.nio.charset.StandardCharsets
// 获取以逗号拆分的库名字符串
String DBList = DBList
// 拆分库名
String[] data = DBList.split(“,”)
// 获取数据长度
int length = data.length;
if (length==0){
return
}
// 循环所有库
for (Integer i = 0; i < length; i++) {
if (data[i]==””){
session.transfer(flowFile,REL_FAILURE)
continue
}
flowFile = session.create()
// 添加属性
flowFile = session.putAttribute(flowFile, ‘DBName’, data[i])
// 写入flowFile中
session.write(flowFile, {outputStream ->
outputStream.write(data[i].toString().getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)
// flowFile —> success
session.transfer(flowFile, REL_SUCCESS)
}

附2:

// 导入相关类
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
// 获取流文件
flowFile = session.get()
// 读取文件内容
def text = ‘’
session.read(flowFile, {
inputStream ->
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
} as InputStreamCallback
)
// 获取DBName
def DBName = flowFile.getAttribute(‘DBName’)
if (text == ‘’) {
session.transfer(flowFile, REL_FAILURE)
return
}
// 拆分表名
String[] data = text.split(“\n”)
// 获取数据长度
int length = data.length;
// 循环所有库
for (Integer i = 0; i < length; i++) {
newFlowFile = session.create()
// 添加属性
newFlowFile = session.putAttribute(newFlowFile, ‘DBName’, DBName)
// 添加属性
newFlowFile = session.putAttribute(newFlowFile, ‘TBName’, data[i])
// 声明容器存储结果
StringBuffer stringBuffer = new StringBuffer()
stringBuffer.append(“{\”DBName\”:\””+ DBName + “\”,\”TBNaem\”:\”” + data[i] + “\”}”)
// 写入flowFile中
session.write(newFlowFile, {outputStream ->
outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)
// flowFile —> success
session.transfer(newFlowFile, REL_SUCCESS)
}
// flowFile —> success
session.transfer(flowFile, REL_FAILURE)

附3:

整个流程为通过用户传入库名来获取整个库数据,如果需要拉单个表,源端只需要SelectHiveQL一个Processor就行,用户自定义SQL,目标端PutHiveStraming也是用户自定义入库库名和表名。