注:历史和实时采用同一套处理流程,区别在于POST请求的JSON格式以及JOLT解析规则
一、源端
1、详细流程结构
1.1 外层(Process Group)
1.2 内层
2、processor
2.1、GenerateFlowFile
a) 功能简介
指定文本(post请求),生成流文件,传入下一个处理器(InvokeHTTP)
b) SETTINGS
c) SCHEDULING
d) PROPERTIES
实时请求格式为:
{
"tagNames":[
"T_UNIT1.P_SELMW", // 标签点名
"T_UNIT1.P_10FGDTL206Q"
]
}
历史请求格式为:
{
"tagNames":[
"T_UNIT1.P_SELMW"
,"T_UNIT1.P_10FGDTL206Q"
,"T_UNIT1.P_10FGDTL205Q"
,"T_UNIT1.P_10FGDTL204Q"
],
"count":4,
"startTime":"2021-05-01 15:50:35.000",
"endTime":"2021-11-22 15:53:39.000"
}
2.2、InvokeHTTP
a) 功能简介
指定POST方式请求,请求内容为上一个流的流文件。响应数据为结果数据集,以流的形式传入下游进行解析。
b) SETTINGS
c) SCHEDULING
d) PROPERTIES
2.3、JoltTransformJSON
a) 功能简介
b) SETTINGS
c) SCHEDULING
d) PROPERTIES
实时解析规则:
[{
"operation": "shift",
"spec": {
"Result": {
"*": {
"TagFullName": "&2[#2].tagName",
"Time": "&2[#2].pointTime",
"Value": "&2[#2].valueDouble"
}
}
}
}]
历史解析规则:
[{
"operation": "shift",
"spec": {
"Result": {
"*": {
"*": {
"$1": "&2[#2].tagName",
"Time": "&2[#2].pointTime",
"Value": "&2[#2].valueDouble"
}
}
}
}
}]
2.4、OutPut
二、目标端
1、详细流程展示
1.1 外层(Process Group)
1.2 内层
2、Processor
2.1、NifiFlow
a) 功能简介
b) SETTINGS
2.2、ExecuteGroovyScript
a) 功能简介
b) SETTINGS
c) SCHEDULING
d) PROPERTIES
e) 处理脚本(Script Body),如下:
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
import java.sql.Connection
import java.sql.Statement
import java.sql.SQLException
import java.sql.DriverManager
import com.alibaba.druid.pool.DruidDataSource
// 获取多个流文件
def flowFileList=session.get(200)
if (!flowFileList)return
Connection conn = null
Statement statement = null
//StringBuffer stringBuffer = new StringBuffer()
JsonSlurper jsonSlurper = new JsonSlurper()
String url = TDUrl
String sql = "INSERT INTO "
Integer count = 1
def getConnect(String jdbcUrl){
DruidDataSource dataSource = new DruidDataSource()
// jdbc properties
dataSource.setDriverClassName("com.taosdata.jdbc.TSDBDriver");
dataSource.setUrl(jdbcUrl);
dataSource.setUsername("root");
dataSource.setPassword("taosdata");
// pool configurations
dataSource.setInitialSize(10);
dataSource.setMinIdle(10);
dataSource.setMaxActive(10);
dataSource.setMaxWait(30000);
dataSource.setValidationQuery("select server_status()");
conn = dataSource.getConnection()
if (!conn) {
log.error(" connect error")
session.transfer(flowFileList,REL_FAILURE)
return
}
}
// get connect
getConnect(url)
for (flowFile in flowFileList) {
// get flowFile content
String content = ''
session.read(flowFile, {inputStream ->
content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
} as InputStreamCallback)
if (content == "" ) { continue }
// format json
def data = jsonSlurper.parseText(content)
data.values().each{
it.each {
json ->
if (json == null || json == "") { return true }
Double value = json.valueDouble
String time = json.pointTime
String tagName = json.tagName
if (value==null || time == null || tagName == null) { return true }
String newTagName = tagName.replace(".","_")
String sTable
try {
sTable = STABLE
} catch(E) {
try {
sTable = tagName.split("\\.")[0]
} catch (NullPointerException e) {
log.error("用户未指定超级表名且未检测到标签点名中含有超级表名,(检测规则:超级表名.字表名),请指定超级表名!")
e.printStackTrace()
}
}
if ( count % 200 == 0) {
statement = conn.createStatement()
statement.executeUpdate(sql)
//stringBuffer.append(sql)
//stringBuffer.append("\r\n")
sql = "INSERT INTO "
}
String valueSql = String.format("%s USING %s TAGS (%s) VALUES ('%s',%s) ", newTagName, sTable, newTagName, time, value )
sql = sql.concat(valueSql)
count ++
}
}
if ( sql != "INSERT INTO ") {
statement = conn.createStatement()
statement.executeUpdate(sql)
//stringBuffer.append(sql)
//stringBuffer.append("\r\n")
}
}
// close
if ( conn != null ) { conn.close() }
if ( statement != null ) { statement.close() }
//newFlowFile = session.create()
//session.write(newFlowFile, {outputStream ->
// outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))
//} as OutputStreamCallback)
//session.transfer(newFlowFile, REL_SUCCESS)
// flowFile --> success
session.transfer(flowFileList,REL_FAILURE)