一、源端

1、库级同步流程

1.1 ProcessGroup内

image.png

1.1.1 ConfigureProcessor

ExecuteGroovyScript
配置项 参数名 配置
SETTINGS Relationships failure【true】
success【false】
SCHEDULING Run Schedule 99999999999999999 sec
Execution Primary node
PROPERTIES Script Body import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets

String databases=DBList;
String[] databaseList = databases.split(“,”);
for ( db in databaseList) {
writeFlowFile(db)
}

void writeFlowFile(String str) {
newFlowFile = session.create();
newFlowFile = session.putAttribute(newFlowFile, ‘query.input.database’, str);
session.write(newFlowFile, { outputStream ->
outputStream.write(str.getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)
session.transfer(newFlowFile, REL_SUCCESS)
}
DBList 用户传入库名(多个逗号分隔)

ExecuteSQL
配置项 参数名 配置 描述
SETTINGS Relationships failure【true】
success【false】
PROPERTIES Database Connection Pooling Service
见ExecuteSQL.DBCPConnectionPool
SQL Pre-Query use ${query.input.databases};

SQL select-Query show tables;

Max Rows Per Flow File 1

ConvertAvroToJSON
配置项 参数名 配置 描述
SETTINGS Relationships failure【true】
success【false】

EvaluateJsonPath
配置项 参数名 配置 描述
SETTINGS Relationships failure【true】
matched【false】
unmatched【true】
PROPERTIES Destination flowfile-attribute
query.input.table $.name

ExecuteSQL
配置项 参数名 配置 描述
SETTINGS Relationships failure【true】
success【false】
SCHEDULING Concurrent Tasks 3
Execution All nodes
PROPERTIES Database Connection Pooling Service
见ExecuteSQL.DBCPConnectionPool
SQL select-Query select * from ${query.input.database}.${query.input.table};
Normalize Table/Column Names true
Normalize Table/Column Names 20000
Output Batch Size 2000

ExecuteSQL.DBCPConnectionPool
配置项 参数名 配置 描述
PROPERTIES Database Connection URL jdbc:impala://10.65.80.4:21050 impala的JDBC连接地址
Database Driver Class Name com.cloudera.impala.jdbc41.Driver 驱动类
Database Driver Location(s) /Users/d/Desktop/WorkSpace/Resource/jar/Impala/Cloudera_ImpalaJDBC41_2.5.41/ 所需驱动jar的地址

1.2 ProcessGroup外

image.png

1.2.1 ConfigureProcessor

OutputPort
Concurrent Tasks 3

2、表级同步流程

2.1 流程展示

image.png

2.1.1 ConfigureProcessor

ExecuteSQL
配置项 参数名 配置 描述
SETTINGS Relationships failure【true】
success【false】
SCHEDULING Run Schedule 99999999999999 sec
Concurrent Tasks 1
Execution Primary node
PROPERTIES Database Connection Pooling Service
见ExecuteSQL.DBCPConnectionPool
SQL select-Query 执行SQL
Normalize Table/Column Names true
Normalize Table/Column Names 20000
Output Batch Size 2000

ExecuteSQL.DBCPConnectionPool
配置项 参数名 配置 描述
PROPERTIES Database Connection URL jdbc:impala://10.65.80.4:21050 impala的JDBC连接地址
Database Driver Class Name com.cloudera.impala.jdbc41.Driver 驱动类
Database Driver Location(s) /Users/d/Desktop/WorkSpace/Resource/jar/Impala/Cloudera_ImpalaJDBC41_2.5.41/ 所需驱动jar的地址

OutputPort
Concurrent Tasks 3

二、目标端

1.1.1 ConfigureProcessor

RemoteProcessGroup
配置项 参数名 配置 描述
PROPERTIES URLs http:localhost:18081/nifi nifi的远程地址
Transport Protocol HTTP nifi传输协议
Database Driver Location(s) /Users/d/Desktop/WorkSpace/Resource/jar/Impala/Cloudera_ImpalaJDBC41_2.5.41/ 所需驱动jar的地址

PutKudu
配置项 参数名 配置 描述
SETTINGS Relationships failure【true】
success【true】
SCHEDULING Concurrent Tasks 3
Execution All nodes
PROPERTIES Kudu Masters 10.65.80.4 kudu Master的地址(多个逗号隔开)
Table Name ${query.input.table} 库级别同步可以指定变量${},表级别同步必须追定表名。
Normalize Table/Column Names true
Normalize Table/Column Names 20000
Output Batch Size 2000