国电电力数据同步传输管控平台
(RestfulToTD流程部署)
目录
一、源端    2
1、详细流程结构    2
2、processor    2
1、GenerateFlowFile    3
2、InvokeHTTP    3
3、SplitJson    4
4、OutPut    5
二、目标端    5
1、详细流程展示    5
2、Processor    5
2.1、NifiFlow    5
2.2、ExecuteGroovyScript    6
一、源端
1、详细流程结构
2、processor
1、GenerateFlowFile
1.1 配置
a) Unique FlowFiles:填写POST请求的Json串
b) Run Schedule:控制请求频率 
2、InvokeHTTP
a) Remote URL:请求地址
b) Content-Type:请求头,json类型 
3、SplitJson
4、OutPut
二、目标端
1、详细流程展示
2、Processor
2.1、NifiFlow
2.2、ExecuteGroovyScript
a) Script Body:自定义脚本
b) TDUrl:TD连接地址
c) Script Body脚本如下:
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.SQLException
import java.sql.DriverManager
// 获取流集合
flowFileList=session.get(100000)
// 判断流集合是否为空
if (!flowFileList)return
// 自定义获取连接方法
// com.taosdata.jdbc.rs.RestfulDriver
// com.taosdata.jdbc.TSDBDriver
def getConnect(String jdbcUrl){Class.forName(“com.taosdata.jdbc.TSDBDriver”)
    Connection conn = DriverManager.getConnection(jdbcUrl)
    if(null == conn){
        throw new SQLException(“Database Create Error”)
        session.transfer(flowFile, RELFAILURE)
    }
    return conn
}
// 创建buffer容器
StringBuffer stringBuffer = new StringBuffer()
// 获取连接地址属性内容
String url = TDUrl
// 调用方法获取连接
Connection conn = getConnect(url)
PreparedStatement ps = null
// 判断
def jsonSlurper = new JsonSlurper()
if (!flowFileList.isEmpty() ) { 
     for ( i = 0; i < 2000; i ++) {
        String sql = “INSERT INTO “
        for (def j = i50; j<((i+1)50);j++) {
            flowFile = flowFileList[j];
            if ( flowFile == null ) {break};
            String text = ‘’;
            session.read(
                flowFile, {
                    inputStream ->
                    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
                } as InputStreamCallback); 
            def data = jsonSlurper.parseText(text)
            assert data instanceof Map
            String tagName = data.TagFullName
            String[] tagNameList = tagName.split(“\.”)
            String newTagName = tagNameList[0]+”“+tagNameList[1]
            String pointTime = data.Time
            Double valueDouble = data.Value
            Integer type = data.Type
            String valueSql = String.format(“%s USING shmstb TAGS (%s,%s) VALUES (‘%s’,%s) “, newTagName, type, newTagName, pointTime, valueDouble )
            sql = sql.concat(valueSql)
        }
        if ( sql == “INSERT INTO “) {break}
        sql = sql.concat(“;”)
        ps = conn.prepareStatement(sql)
        ps.executeUpdate()
        stringBuffer.append(sql)
        stringBuffer.append(“\r\n”)
    }
}
// close
if ( conn != null ) { conn.close() }
if ( ps != null ) { ps.close() }
/
newFlowFile = session.create()
session.write(newFlowFile, {outputStream ->
    outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)
session.transfer(newFlowFile, REL_SUCCESS)
/
//flowFile —> success
session.transfer(flowFileList,REL_FAILURE)
 
 
 
 
 
                         
                                

