一、源端
1、GenerateFlowFile
1.1 配置
a) Unique FlowFiles:填写POST请求的Json串
b) Run Schedule:控制请求频率
2、InvokeHTTP
a) Remote URL:请求地址
b) Content-Type:请求头,json类型
3、SplitJson
4、OutPut
二、目标端
1、NifiFlow
2、ExecuteGroovyScript
a) Script Body:自定义脚本
b) TDUrl:TD连接地址
c) Script Body脚本如下:
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.SQLException
import java.sql.DriverManager
// 获取流集合
flowFileList=session.get(100000)
// 判断流集合是否为空
if (!flowFileList)return
// 自定义获取连接方法
// com.taosdata.jdbc.rs.RestfulDriver
// com.taosdata.jdbc.TSDBDriver
def getConnect(String jdbcUrl){Class.forName("com.taosdata.jdbc.TSDBDriver")
Connection conn = DriverManager.getConnection(jdbcUrl)
if(null == conn){
throw new SQLException("Database Create Error")
session.transfer(flowFile, REL_FAILURE)
}
return conn
}
// 创建buffer容器
StringBuffer stringBuffer = new StringBuffer()
// 获取连接地址属性内容
String url = TDUrl
// 调用方法获取连接
Connection conn = getConnect(url)
PreparedStatement ps = null
// 判断
def jsonSlurper = new JsonSlurper()
if (!flowFileList.isEmpty() ) {
for ( i = 0; i < 2000; i ++) {
String sql = "INSERT INTO "
for (def j = i*50; j<((i+1)*50);j++) {
flowFile = flowFileList[j];
if ( flowFile == null ) {break};
String text = '';
session.read(
flowFile, {
inputStream ->
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
} as InputStreamCallback);
def data = jsonSlurper.parseText(text)
assert data instanceof Map
String tagName = data.TagFullName
String[] tagNameList = tagName.split("\\.")
String newTagName = tagNameList[0]+"_"+tagNameList[1]
String pointTime = data.Time
Double valueDouble = data.Value
Integer type = data.Type
String valueSql = String.format("%s USING shmstb TAGS (%s,%s) VALUES ('%s',%s) ", newTagName, type, newTagName, pointTime, valueDouble )
sql = sql.concat(valueSql)
}
if ( sql == "INSERT INTO ") {break}
sql = sql.concat(";")
ps = conn.prepareStatement(sql)
ps.executeUpdate()
stringBuffer.append(sql)
stringBuffer.append("\r\n")
}
}
// close
if ( conn != null ) { conn.close() }
if ( ps != null ) { ps.close() }
/*
newFlowFile = session.create()
session.write(newFlowFile, {outputStream ->
outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)
session.transfer(newFlowFile, REL_SUCCESS)
*/
//flowFile --> success
session.transfer(flowFileList,REL_FAILURE)