国电电力数据同步传输管控平台
(RestfulToTD流程部署)
目录
一、源端 2
1、详细流程结构 2
2、processor 2
1、GenerateFlowFile 3
2、InvokeHTTP 3
3、SplitJson 4
4、OutPut 5
二、目标端 5
1、详细流程展示 5
2、Processor 5
2.1、NifiFlow 5
2.2、ExecuteGroovyScript 6

一、源端

1、详细流程结构

07_国电电力数据同步传输管控平台-RestfulToTD流程部署v1.0 - 图1

2、processor

1、GenerateFlowFile

1.1 配置
a) Unique FlowFiles:填写POST请求的Json串
07_国电电力数据同步传输管控平台-RestfulToTD流程部署v1.0 - 图2
b) Run Schedule:控制请求频率
07_国电电力数据同步传输管控平台-RestfulToTD流程部署v1.0 - 图3

2、InvokeHTTP

a) Remote URL:请求地址
07_国电电力数据同步传输管控平台-RestfulToTD流程部署v1.0 - 图4
b) Content-Type:请求头,json类型
07_国电电力数据同步传输管控平台-RestfulToTD流程部署v1.0 - 图5

3、SplitJson

a) JsonPath Expression:拆分Json
07_国电电力数据同步传输管控平台-RestfulToTD流程部署v1.0 - 图6

4、OutPut

二、目标端

1、详细流程展示

07_国电电力数据同步传输管控平台-RestfulToTD流程部署v1.0 - 图7

2、Processor

2.1、NifiFlow

a) URLs:源端nifi地址,多个逗号分隔
07_国电电力数据同步传输管控平台-RestfulToTD流程部署v1.0 - 图8

2.2、ExecuteGroovyScript

a) Script Body:自定义脚本
b) TDUrl:TD连接地址
07_国电电力数据同步传输管控平台-RestfulToTD流程部署v1.0 - 图9
c) Script Body脚本如下:
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.SQLException
import java.sql.DriverManager
// 获取流集合
flowFileList=session.get(100000)
// 判断流集合是否为空
if (!flowFileList)return
// 自定义获取连接方法
// com.taosdata.jdbc.rs.RestfulDriver
// com.taosdata.jdbc.TSDBDriver
def getConnect(String jdbcUrl){Class.forName(“com.taosdata.jdbc.TSDBDriver”)
Connection conn = DriverManager.getConnection(jdbcUrl)
if(null == conn){
throw new SQLException(“Database Create Error”)
session.transfer(flowFile, RELFAILURE)
}
return conn
}
// 创建buffer容器
StringBuffer stringBuffer = new StringBuffer()
// 获取连接地址属性内容
String url = TDUrl
// 调用方法获取连接
Connection conn = getConnect(url)
PreparedStatement ps = null
// 判断
def jsonSlurper = new JsonSlurper()
if (!flowFileList.isEmpty() ) {
for ( i = 0; i < 2000; i ++) {
String sql = “INSERT INTO “
for (def j = i50; j<((i+1)50);j++) {
flowFile = flowFileList[j];
if ( flowFile == null ) {break};
String text = ‘’;
session.read(
flowFile, {
inputStream ->
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
} as InputStreamCallback);
def data = jsonSlurper.parseText(text)
assert data instanceof Map
String tagName = data.TagFullName
String[] tagNameList = tagName.split(“\.”)
String newTagName = tagNameList[0]+”
“+tagNameList[1]
String pointTime = data.Time
Double valueDouble = data.Value
Integer type = data.Type
String valueSql = String.format(“%s USING shmstb TAGS (%s,%s) VALUES (‘%s’,%s) “, newTagName, type, newTagName, pointTime, valueDouble )
sql = sql.concat(valueSql)
}
if ( sql == “INSERT INTO “) {break}
sql = sql.concat(“;”)
ps = conn.prepareStatement(sql)
ps.executeUpdate()
stringBuffer.append(sql)
stringBuffer.append(“\r\n”)
}
}
// close
if ( conn != null ) { conn.close() }
if ( ps != null ) { ps.close() }
/
newFlowFile = session.create()
session.write(newFlowFile, {outputStream ->
outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)
session.transfer(newFlowFile, REL_SUCCESS)
/
//flowFile —> success
session.transfer(flowFileList,REL_FAILURE)