注:历史和实时采用同一套处理流程,区别在于POST请求的JSON格式以及JOLT解析规则

一、源端

1、详细流程结构

1.1 外层(Process Group)

image.png

1.2 内层

image.png

2、processor

2.1、GenerateFlowFile

a) 功能简介

指定文本(post请求),生成流文件,传入下一个处理器(InvokeHTTP)

b) SETTINGS

默认值

c) SCHEDULING

image.png

d) PROPERTIES

image.png实时请求格式为:

  1. {
  2. "tagNames":[
  3. "T_UNIT1.P_SELMW", // 标签点名
  4. "T_UNIT1.P_10FGDTL206Q"
  5. ]
  6. }

历史请求格式为:

  1. {
  2. "tagNames":[
  3. "T_UNIT1.P_SELMW"
  4. ,"T_UNIT1.P_10FGDTL206Q"
  5. ,"T_UNIT1.P_10FGDTL205Q"
  6. ,"T_UNIT1.P_10FGDTL204Q"
  7. ],
  8. "count":4,
  9. "startTime":"2021-05-01 15:50:35.000",
  10. "endTime":"2021-11-22 15:53:39.000"
  11. }

2.2、InvokeHTTP

a) 功能简介

指定POST方式请求,请求内容为上一个流的流文件。响应数据为结果数据集,以流的形式传入下游进行解析。

b) SETTINGS

image.png

c) SCHEDULING

默认值
image.png

d) PROPERTIES

image.png
image.png

2.3、JoltTransformJSON

a) 功能简介

拆分数据集为单条json数据。

b) SETTINGS

image.png

c) SCHEDULING

默认值
image.png

d) PROPERTIES

image.png
实时解析规则:

  1. [{
  2. "operation": "shift",
  3. "spec": {
  4. "Result": {
  5. "*": {
  6. "TagFullName": "&2[#2].tagName",
  7. "Time": "&2[#2].pointTime",
  8. "Value": "&2[#2].valueDouble"
  9. }
  10. }
  11. }
  12. }]

历史解析规则:

  1. [{
  2. "operation": "shift",
  3. "spec": {
  4. "Result": {
  5. "*": {
  6. "*": {
  7. "$1": "&2[#2].tagName",
  8. "Time": "&2[#2].pointTime",
  9. "Value": "&2[#2].valueDouble"
  10. }
  11. }
  12. }
  13. }
  14. }]

2.4、OutPut

二、目标端

1、详细流程展示

1.1 外层(Process Group)

image.png

1.2 内层

image.png

2、Processor

2.1、NifiFlow

a) 功能简介

夸集群接收数据

b) SETTINGS

image.png

2.2、ExecuteGroovyScript

a) 功能简介

b) SETTINGS

image.png

c) SCHEDULING

image.png

d) PROPERTIES

image.png
e) 处理脚本(Script Body),如下:

  1. import groovy.json.JsonSlurper
  2. import java.nio.charset.StandardCharsets
  3. import org.apache.commons.io.IOUtils
  4. import java.sql.Connection
  5. import java.sql.Statement
  6. import java.sql.SQLException
  7. import java.sql.DriverManager
  8. import com.alibaba.druid.pool.DruidDataSource
  9. // 获取多个流文件
  10. def flowFileList=session.get(200)
  11. if (!flowFileList)return
  12. Connection conn = null
  13. Statement statement = null
  14. //StringBuffer stringBuffer = new StringBuffer()
  15. JsonSlurper jsonSlurper = new JsonSlurper()
  16. String url = TDUrl
  17. String sql = "INSERT INTO "
  18. Integer count = 1
  19. def getConnect(String jdbcUrl){
  20. DruidDataSource dataSource = new DruidDataSource()
  21. // jdbc properties
  22. dataSource.setDriverClassName("com.taosdata.jdbc.TSDBDriver");
  23. dataSource.setUrl(jdbcUrl);
  24. dataSource.setUsername("root");
  25. dataSource.setPassword("taosdata");
  26. // pool configurations
  27. dataSource.setInitialSize(10);
  28. dataSource.setMinIdle(10);
  29. dataSource.setMaxActive(10);
  30. dataSource.setMaxWait(30000);
  31. dataSource.setValidationQuery("select server_status()");
  32. conn = dataSource.getConnection()
  33. if (!conn) {
  34. log.error(" connect error")
  35. session.transfer(flowFileList,REL_FAILURE)
  36. return
  37. }
  38. }
  39. // get connect
  40. getConnect(url)
  41. for (flowFile in flowFileList) {
  42. // get flowFile content
  43. String content = ''
  44. session.read(flowFile, {inputStream ->
  45. content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
  46. } as InputStreamCallback)
  47. if (content == "" ) { continue }
  48. // format json
  49. def data = jsonSlurper.parseText(content)
  50. data.values().each{
  51. it.each {
  52. json ->
  53. if (json == null || json == "") { return true }
  54. Double value = json.valueDouble
  55. String time = json.pointTime
  56. String tagName = json.tagName
  57. if (value==null || time == null || tagName == null) { return true }
  58. String newTagName = tagName.replace(".","_")
  59. String sTable
  60. try {
  61. sTable = STABLE
  62. } catch(E) {
  63. try {
  64. sTable = tagName.split("\\.")[0]
  65. } catch (NullPointerException e) {
  66. log.error("用户未指定超级表名且未检测到标签点名中含有超级表名,(检测规则:超级表名.字表名),请指定超级表名!")
  67. e.printStackTrace()
  68. }
  69. }
  70. if ( count % 200 == 0) {
  71. statement = conn.createStatement()
  72. statement.executeUpdate(sql)
  73. //stringBuffer.append(sql)
  74. //stringBuffer.append("\r\n")
  75. sql = "INSERT INTO "
  76. }
  77. String valueSql = String.format("%s USING %s TAGS (%s) VALUES ('%s',%s) ", newTagName, sTable, newTagName, time, value )
  78. sql = sql.concat(valueSql)
  79. count ++
  80. }
  81. }
  82. if ( sql != "INSERT INTO ") {
  83. statement = conn.createStatement()
  84. statement.executeUpdate(sql)
  85. //stringBuffer.append(sql)
  86. //stringBuffer.append("\r\n")
  87. }
  88. }
  89. // close
  90. if ( conn != null ) { conn.close() }
  91. if ( statement != null ) { statement.close() }
  92. //newFlowFile = session.create()
  93. //session.write(newFlowFile, {outputStream ->
  94. // outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))
  95. //} as OutputStreamCallback)
  96. //session.transfer(newFlowFile, REL_SUCCESS)
  97. // flowFile --> success
  98. session.transfer(flowFileList,REL_FAILURE)