一、源端
1、库级同步流程
1.1 ProcessGroup内
1.1.1 ConfigureProcessor
ExecuteGroovyScript
配置项 | 参数名 | 配置 |
---|---|---|
SETTINGS | Relationships | failure【true】 |
success【false】 | ||
SCHEDULING | Run Schedule | 99999999999999999 sec |
Execution | Primary node | |
PROPERTIES | Script Body | import groovy.json.JsonSlurper import java.nio.charset.StandardCharsets String databases=DBList; String[] databaseList = databases.split(“,”); for ( db in databaseList) { writeFlowFile(db) } void writeFlowFile(String str) { newFlowFile = session.create(); newFlowFile = session.putAttribute(newFlowFile, ‘query.input.database’, str); session.write(newFlowFile, { outputStream -> outputStream.write(str.getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback) session.transfer(newFlowFile, REL_SUCCESS) } |
DBList | 用户传入库名(多个逗号分隔) |
ExecuteSQL
配置项 | 参数名 | 配置 | 描述 |
---|---|---|---|
SETTINGS | Relationships | failure【true】 | |
success【false】 | |||
PROPERTIES | Database Connection Pooling Service | 见ExecuteSQL.DBCPConnectionPool | |
SQL Pre-Query | use ${query.input.databases}; | ||
SQL select-Query | show tables; | ||
Max Rows Per Flow File | 1 |
ConvertAvroToJSON
配置项 | 参数名 | 配置 | 描述 |
---|---|---|---|
SETTINGS | Relationships | failure【true】 | |
success【false】 |
EvaluateJsonPath
配置项 | 参数名 | 配置 | 描述 |
---|---|---|---|
SETTINGS | Relationships | failure【true】 | |
matched【false】 | |||
unmatched【true】 | |||
PROPERTIES | Destination | flowfile-attribute | |
query.input.table | $.name |
ExecuteSQL
配置项 | 参数名 | 配置 | 描述 |
---|---|---|---|
SETTINGS | Relationships | failure【true】 | |
success【false】 | |||
SCHEDULING | Concurrent Tasks | 3 | |
Execution | All nodes | ||
PROPERTIES | Database Connection Pooling Service | 见ExecuteSQL.DBCPConnectionPool | |
SQL select-Query | select * from ${query.input.database}.${query.input.table}; | ||
Normalize Table/Column Names | true | ||
Normalize Table/Column Names | 20000 | ||
Output Batch Size | 2000 |
ExecuteSQL.DBCPConnectionPool
配置项 | 参数名 | 配置 | 描述 |
---|---|---|---|
PROPERTIES | Database Connection URL | jdbc:impala://10.65.80.4:21050 | impala的JDBC连接地址 |
Database Driver Class Name | com.cloudera.impala.jdbc41.Driver | 驱动类 | |
Database Driver Location(s) | /Users/d/Desktop/WorkSpace/Resource/jar/Impala/Cloudera_ImpalaJDBC41_2.5.41/ | 所需驱动jar的地址 |
1.2 ProcessGroup外
1.2.1 ConfigureProcessor
OutputPort
Concurrent Tasks | 3 |
---|---|
2、表级同步流程
2.1 流程展示
2.1.1 ConfigureProcessor
ExecuteSQL
配置项 | 参数名 | 配置 | 描述 |
---|---|---|---|
SETTINGS | Relationships | failure【true】 | |
success【false】 | |||
SCHEDULING | Run Schedule | 99999999999999 sec | |
Concurrent Tasks | 1 | ||
Execution | Primary node | ||
PROPERTIES | Database Connection Pooling Service | 见ExecuteSQL.DBCPConnectionPool | |
SQL select-Query | 执行SQL | ||
Normalize Table/Column Names | true | ||
Normalize Table/Column Names | 20000 | ||
Output Batch Size | 2000 |
ExecuteSQL.DBCPConnectionPool
配置项 | 参数名 | 配置 | 描述 |
---|---|---|---|
PROPERTIES | Database Connection URL | jdbc:impala://10.65.80.4:21050 | impala的JDBC连接地址 |
Database Driver Class Name | com.cloudera.impala.jdbc41.Driver | 驱动类 | |
Database Driver Location(s) | /Users/d/Desktop/WorkSpace/Resource/jar/Impala/Cloudera_ImpalaJDBC41_2.5.41/ | 所需驱动jar的地址 |
OutputPort
Concurrent Tasks | 3 |
---|---|
二、目标端
1.1.1 ConfigureProcessor
RemoteProcessGroup
配置项 | 参数名 | 配置 | 描述 |
---|---|---|---|
PROPERTIES | URLs | http:localhost:18081/nifi | nifi的远程地址 |
Transport Protocol | HTTP | nifi传输协议 | |
Database Driver Location(s) | /Users/d/Desktop/WorkSpace/Resource/jar/Impala/Cloudera_ImpalaJDBC41_2.5.41/ | 所需驱动jar的地址 |
PutKudu
配置项 | 参数名 | 配置 | 描述 |
---|---|---|---|
SETTINGS | Relationships | failure【true】 | |
success【true】 | |||
SCHEDULING | Concurrent Tasks | 3 | |
Execution | All nodes | ||
PROPERTIES | Kudu Masters | 10.65.80.4 | kudu Master的地址(多个逗号隔开) |
Table Name | ${query.input.table} | 库级别同步可以指定变量${},表级别同步必须追定表名。 | |
Normalize Table/Column Names | true | ||
Normalize Table/Column Names | 20000 | ||
Output Batch Size | 2000 |