一、源端
1、库级同步流程
1.1 ProcessGroup内
1.1.1 ConfigureProcessor
ExecuteGroovyScript
| 配置项 | 参数名 | 配置 | 
|---|---|---|
| SETTINGS | Relationships | failure【true】 | 
| success【false】 | ||
| SCHEDULING | Run Schedule | 99999999999999999 sec | 
| Execution | Primary node | |
| PROPERTIES | Script Body | import groovy.json.JsonSlurper import java.nio.charset.StandardCharsets String databases=DBList; String[] databaseList = databases.split(“,”); for ( db in databaseList) { writeFlowFile(db) } void writeFlowFile(String str) { newFlowFile = session.create(); newFlowFile = session.putAttribute(newFlowFile, ‘query.input.database’, str); session.write(newFlowFile, { outputStream -> outputStream.write(str.getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback) session.transfer(newFlowFile, REL_SUCCESS) } | 
| DBList | 用户传入库名(多个逗号分隔) | 
ExecuteSQL
| 配置项 | 参数名 | 配置 | 描述 | 
|---|---|---|---|
| SETTINGS | Relationships | failure【true】 | |
| success【false】 | |||
| PROPERTIES | Database Connection Pooling Service | 见ExecuteSQL.DBCPConnectionPool | |
| SQL Pre-Query | use ${query.input.databases}; | ||
| SQL select-Query | show tables; | ||
| Max Rows Per Flow File | 1 | 
ConvertAvroToJSON
| 配置项 | 参数名 | 配置 | 描述 | 
|---|---|---|---|
| SETTINGS | Relationships | failure【true】 | |
| success【false】 | 
EvaluateJsonPath
| 配置项 | 参数名 | 配置 | 描述 | 
|---|---|---|---|
| SETTINGS | Relationships | failure【true】 | |
| matched【false】 | |||
| unmatched【true】 | |||
| PROPERTIES | Destination | flowfile-attribute | |
| query.input.table | $.name | 
ExecuteSQL
| 配置项 | 参数名 | 配置 | 描述 | 
|---|---|---|---|
| SETTINGS | Relationships | failure【true】 | |
| success【false】 | |||
| SCHEDULING | Concurrent Tasks | 3 | |
| Execution | All nodes | ||
| PROPERTIES | Database Connection Pooling Service | 见ExecuteSQL.DBCPConnectionPool | |
| SQL select-Query | select * from ${query.input.database}.${query.input.table}; | ||
| Normalize Table/Column Names | true | ||
| Normalize Table/Column Names | 20000 | ||
| Output Batch Size | 2000 | 
ExecuteSQL.DBCPConnectionPool
| 配置项 | 参数名 | 配置 | 描述 | 
|---|---|---|---|
| PROPERTIES | Database Connection URL | jdbc:impala://10.65.80.4:21050 | impala的JDBC连接地址 | 
| Database Driver Class Name | com.cloudera.impala.jdbc41.Driver | 驱动类 | |
| Database Driver Location(s) | /Users/d/Desktop/WorkSpace/Resource/jar/Impala/Cloudera_ImpalaJDBC41_2.5.41/ | 所需驱动jar的地址 | 
1.2 ProcessGroup外
1.2.1 ConfigureProcessor
OutputPort
| Concurrent Tasks | 3 | 
|---|---|
2、表级同步流程
2.1 流程展示
2.1.1 ConfigureProcessor
ExecuteSQL
| 配置项 | 参数名 | 配置 | 描述 | 
|---|---|---|---|
| SETTINGS | Relationships | failure【true】 | |
| success【false】 | |||
| SCHEDULING | Run Schedule | 99999999999999 sec | |
| Concurrent Tasks | 1 | ||
| Execution | Primary node | ||
| PROPERTIES | Database Connection Pooling Service | 见ExecuteSQL.DBCPConnectionPool | |
| SQL select-Query | 执行SQL | ||
| Normalize Table/Column Names | true | ||
| Normalize Table/Column Names | 20000 | ||
| Output Batch Size | 2000 | 
ExecuteSQL.DBCPConnectionPool
| 配置项 | 参数名 | 配置 | 描述 | 
|---|---|---|---|
| PROPERTIES | Database Connection URL | jdbc:impala://10.65.80.4:21050 | impala的JDBC连接地址 | 
| Database Driver Class Name | com.cloudera.impala.jdbc41.Driver | 驱动类 | |
| Database Driver Location(s) | /Users/d/Desktop/WorkSpace/Resource/jar/Impala/Cloudera_ImpalaJDBC41_2.5.41/ | 所需驱动jar的地址 | 
OutputPort
| Concurrent Tasks | 3 | 
|---|---|
二、目标端
1.1.1 ConfigureProcessor
RemoteProcessGroup
| 配置项 | 参数名 | 配置 | 描述 | 
|---|---|---|---|
| PROPERTIES | URLs | http:localhost:18081/nifi | nifi的远程地址 | 
| Transport Protocol | HTTP | nifi传输协议 | |
| Database Driver Location(s) | /Users/d/Desktop/WorkSpace/Resource/jar/Impala/Cloudera_ImpalaJDBC41_2.5.41/ | 所需驱动jar的地址 | 
PutKudu
| 配置项 | 参数名 | 配置 | 描述 | 
|---|---|---|---|
| SETTINGS | Relationships | failure【true】 | |
| success【true】 | |||
| SCHEDULING | Concurrent Tasks | 3 | |
| Execution | All nodes | ||
| PROPERTIES | Kudu Masters | 10.65.80.4 | kudu Master的地址(多个逗号隔开) | 
| Table Name | ${query.input.table} | 库级别同步可以指定变量${},表级别同步必须追定表名。 | |
| Normalize Table/Column Names | true | ||
| Normalize Table/Column Names | 20000 | ||
| Output Batch Size | 2000 | 
 
 
 
 
                         
                                

