一、源端

image.png

1、GenerateFlowFile

1.1 配置
a) Unique FlowFiles:填写POST请求的Json串
image.png
b) Run Schedule:控制请求频率
image.png

2、InvokeHTTP

a) Remote URL:请求地址
image.png
b) Content-Type:请求头,json类型
image.png

3、SplitJson

a) JsonPath Expression:拆分Json
image.png

4、OutPut

二、目标端

image.png

1、NifiFlow

a) URLs:源端nifi地址,多个逗号分隔
image.png

2、ExecuteGroovyScript

a) Script Body:自定义脚本
b) TDUrl:TD连接地址
image.png
c) Script Body脚本如下:

  1. import groovy.json.JsonSlurper
  2. import java.nio.charset.StandardCharsets
  3. import org.apache.commons.io.IOUtils
  4. import java.sql.Connection
  5. import java.sql.PreparedStatement
  6. import java.sql.SQLException
  7. import java.sql.DriverManager
  8. // 获取流集合
  9. flowFileList=session.get(100000)
  10. // 判断流集合是否为空
  11. if (!flowFileList)return
  12. // 自定义获取连接方法
  13. // com.taosdata.jdbc.rs.RestfulDriver
  14. // com.taosdata.jdbc.TSDBDriver
  15. def getConnect(String jdbcUrl){Class.forName("com.taosdata.jdbc.TSDBDriver")
  16. Connection conn = DriverManager.getConnection(jdbcUrl)
  17. if(null == conn){
  18. throw new SQLException("Database Create Error")
  19. session.transfer(flowFile, REL_FAILURE)
  20. }
  21. return conn
  22. }
  23. // 创建buffer容器
  24. StringBuffer stringBuffer = new StringBuffer()
  25. // 获取连接地址属性内容
  26. String url = TDUrl
  27. // 调用方法获取连接
  28. Connection conn = getConnect(url)
  29. PreparedStatement ps = null
  30. // 判断
  31. def jsonSlurper = new JsonSlurper()
  32. if (!flowFileList.isEmpty() ) {
  33. for ( i = 0; i < 2000; i ++) {
  34. String sql = "INSERT INTO "
  35. for (def j = i*50; j<((i+1)*50);j++) {
  36. flowFile = flowFileList[j];
  37. if ( flowFile == null ) {break};
  38. String text = '';
  39. session.read(
  40. flowFile, {
  41. inputStream ->
  42. text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
  43. } as InputStreamCallback);
  44. def data = jsonSlurper.parseText(text)
  45. assert data instanceof Map
  46. String tagName = data.TagFullName
  47. String[] tagNameList = tagName.split("\\.")
  48. String newTagName = tagNameList[0]+"_"+tagNameList[1]
  49. String pointTime = data.Time
  50. Double valueDouble = data.Value
  51. Integer type = data.Type
  52. String valueSql = String.format("%s USING shmstb TAGS (%s,%s) VALUES ('%s',%s) ", newTagName, type, newTagName, pointTime, valueDouble )
  53. sql = sql.concat(valueSql)
  54. }
  55. if ( sql == "INSERT INTO ") {break}
  56. sql = sql.concat(";")
  57. ps = conn.prepareStatement(sql)
  58. ps.executeUpdate()
  59. stringBuffer.append(sql)
  60. stringBuffer.append("\r\n")
  61. }
  62. }
  63. // close
  64. if ( conn != null ) { conn.close() }
  65. if ( ps != null ) { ps.close() }
  66. /*
  67. newFlowFile = session.create()
  68. session.write(newFlowFile, {outputStream ->
  69. outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))
  70. } as OutputStreamCallback)
  71. session.transfer(newFlowFile, REL_SUCCESS)
  72. */
  73. //flowFile --> success
  74. session.transfer(flowFileList,REL_FAILURE)