一、源端

1、结构展示

1.1 外层

image.png

1.2 内层

image.png

2、PROCESS

2.1 ExecuteGroovyScript

a)SETTINGS
image.png
b)SCHEDULING
image.png
c)PROPERTIES
image.png
必填参数
DBList:库名(多个DB逗号分隔)
StartTime:开始时间
EngTime:结束时间
TDUrl:TD链接地址

可选参数
STableList:超级表名(多个超级表名以逗号分隔,多个超级表会交叉匹配多个库)
TableList:子表名(标签点名)(多个子表名逗号分隔,多个子表会交叉匹配多个库)
脚本内容见 附1

二、目标端

1、结构展示

1.1 外层

image.png

1.2 内层

image.png

2、PROCESS

2.1 NIFIFlow
image.png

2.1 ExecuteGroovyScript

a)PROPERTIES
image.png
必填参数
TDUrl:TD链接地址
可选参数
DBName:库名,不指定默认以源端库名进行存储
STableName:超级表名,不指定默认以源端超级表名进行存储

脚本内容见 附2

三、附页:

1、附1:

  1. // TD 到 TD
  2. // Groovy 1
  3. import groovy.json.JsonSlurper
  4. import groovy.json.JsonOutput
  5. import java.nio.charset.StandardCharsets
  6. import org.apache.commons.io.IOUtils
  7. import java.sql.Connection
  8. import java.sql.Statement
  9. import java.sql.ResultSet
  10. import com.alibaba.druid.pool.DruidDataSource
  11. StringBuffer stringBuffer = new StringBuffer()
  12. JsonSlurper jsonSlurper = new JsonSlurper()
  13. JsonOutput jsonOutput = new JsonOutput()
  14. ResultSet rs = null
  15. Connection conn = getConnect()
  16. Statement statement = conn.createStatement()
  17. ArrayList<String> inforList = getConnInfor(statement,rs)
  18. for ( text in inforList) {
  19. def data = jsonSlurper.parseText(text)
  20. String sql = getSQL(data)
  21. rs = statement.executeQuery(sql)
  22. if (rs == null) { return }
  23. ArrayList<String> values = new ArrayList<>()
  24. Integer count = 1
  25. while (rs.next()) {
  26. if ( count % 100000 == 0) {
  27. createNewFlowFile(values,data)
  28. values.clear();
  29. }
  30. String PointTime = rs.getTimestamp("point_time")
  31. Double ValueDouble = rs.getDouble("value_double")
  32. String TagName = getTagName(rs,data.TABLE)
  33. String json = JsonOutput.toJson(pointTime:PointTime,valueDouble:ValueDouble,tagName:TagName)
  34. values.add(json)
  35. count++
  36. }
  37. if (values != null){
  38. createNewFlowFile(values,data)
  39. values.clear();
  40. }
  41. count = 1
  42. }
  43. if (rs != null) {rs.close()}
  44. if (statement != null) {statement.close()}
  45. if (conn != null) {conn.close()}
  46. // 获取连接池
  47. Connection getConnect(){
  48. String url = TDUrl
  49. DruidDataSource dataSource = new DruidDataSource()
  50. // jdbc properties
  51. dataSource.setDriverClassName("com.taosdata.jdbc.TSDBDriver");
  52. dataSource.setUrl(url);
  53. dataSource.setUsername("root");
  54. dataSource.setPassword("taosdata");
  55. // pool configurations
  56. dataSource.setInitialSize(10);
  57. dataSource.setMinIdle(10);
  58. dataSource.setMaxActive(10);
  59. dataSource.setMaxWait(30000);
  60. dataSource.setValidationQuery("select server_status()");
  61. return dataSource.getConnection()
  62. }
  63. // 获取连接信息
  64. ArrayList<String> getConnInfor(Statement statement,ResultSet rs){
  65. ArrayList<String> dataList = new ArrayList<>()
  66. String startTime = StartTime
  67. String endTime = EndTime
  68. String dbStr,sTableStr,tableStr
  69. String[] dbList,stbList,tbList
  70. try { dbStr = DBList;dbList = dbStr.split(",")} catch (Exception e) { e.printStackTrace()}
  71. try { sTableStr = STableList;stbList = sTableStr.split(",") } catch (Exception e) { e.printStackTrace() }
  72. try {tableStr = TableList;tbList = tableStr.split(",") } catch (Exception e) { e.printStackTrace() }
  73. if (!dbStr) {return}
  74. for (db in dbList) {
  75. if (tableStr) {
  76. if (sTableStr){
  77. for (stb in stbList) {
  78. for (tb in tbList) {
  79. String json = JsonOutput.toJson([DBNAME:db, STABLE:stb, TABLE:tb, STARTTIME:startTime, ENDTIME: endTime])
  80. dataList.add(json.toString())
  81. }
  82. }
  83. } else {
  84. for (tb in tbList) {
  85. String json = JsonOutput.toJson([DBNAME:db, STABLE:"default", TABLE:tb, STARTTIME:startTime, ENDTIME: endTime])
  86. dataList.add(json.toString())
  87. }
  88. }
  89. } else if (sTableStr) {
  90. for (stb in stbList) {
  91. String json = JsonOutput.toJson([DBNAME:db, STABLE:stb, STARTTIME:startTime, ENDTIME: endTime])
  92. dataList.add(json.toString())
  93. }
  94. } else if (dbStr) {
  95. String sql = String.format("show %s.stables;",db)
  96. rs = statement.executeQuery(sql)
  97. if (rs == null) { continue }
  98. while (rs.next()) {
  99. String sTable = rs.getString(1)
  100. String json = JsonOutput.toJson([DBNAME:db, STABLE:sTable, STARTTIME:startTime, ENDTIME: endTime])
  101. dataList.add(json.toString())
  102. }
  103. }
  104. }
  105. return dataList
  106. }
  107. // 获取sql
  108. String getSQL (def data) {
  109. String sql
  110. if (data.TABLE) {
  111. sql = String.format("SELECT * FROM %s.%s WHERE point_time >= '%s' AND point_time <= '%s' ",data.DBNAME,data.TABLE,data.STARTTIME,data.ENDTIME)
  112. } else {
  113. sql = String.format("SELECT * FROM %s.%s WHERE point_time >= '%s' AND point_time <= '%s' ",data.DBNAME,data.STABLE,data.STARTTIME,data.ENDTIME)
  114. }
  115. return sql
  116. }
  117. // 获取tagName
  118. String getTagName(ResultSet rs,String jsonTagName) {
  119. if (jsonTagName) {
  120. return jsonTagName
  121. }
  122. return rs.getString("tagName")
  123. }
  124. // 创建流文件
  125. def createNewFlowFile(ArrayList<String> values,def data){
  126. newFlowFile = session.create()
  127. session.write(newFlowFile, {outputStream ->
  128. outputStream.write(Arrays.toString(values.toArray()).getBytes(StandardCharsets.UTF_8))
  129. } as OutputStreamCallback)
  130. newFlowFile = session.putAttribute(newFlowFile, 'DBNAME', data.DBNAME)
  131. newFlowFile = session.putAttribute(newFlowFile, 'STABLE', data.STABLE)
  132. session.transfer(newFlowFile, REL_SUCCESS)
  133. }

2、附2:

  1. import groovy.json.JsonSlurper
  2. import java.nio.charset.StandardCharsets
  3. import org.apache.commons.io.IOUtils
  4. import java.sql.Connection
  5. import java.sql.Statement
  6. import java.sql.SQLException
  7. import java.sql.DriverManager
  8. import com.alibaba.druid.pool.DruidDataSource
  9. def flowFileList=session.get(1000)
  10. if (!flowFileList)return
  11. Connection conn = null
  12. Statement statement = null
  13. //StringBuffer stringBuffer = new StringBuffer()
  14. def jsonSlurper = new JsonSlurper()
  15. String url = TDUrl
  16. conn = getConnect(url)
  17. Integer count = 1
  18. String sql = "INSERT INTO "
  19. String sTable
  20. def getConnect(String jdbcUrl){
  21. DruidDataSource dataSource = new DruidDataSource()
  22. // jdbc properties
  23. dataSource.setDriverClassName("com.taosdata.jdbc.TSDBDriver");
  24. dataSource.setUrl(jdbcUrl);
  25. dataSource.setUsername("root");
  26. dataSource.setPassword("taosdata");
  27. // pool configurations
  28. dataSource.setInitialSize(10);
  29. dataSource.setMinIdle(10);
  30. dataSource.setMaxActive(10);
  31. dataSource.setMaxWait(30000);
  32. dataSource.setValidationQuery("select server_status()");
  33. return dataSource.getConnection()
  34. }
  35. try{
  36. sTable=STABLE
  37. }catch(Exception e){
  38. e.printStackTrace()
  39. }
  40. for (flowFile in flowFileList) {
  41. if ( flowFile == null ) { continue }
  42. String AttributeSTable = flowFile.getAttribute('STABLE')
  43. String content = ""
  44. session.read(flowFile, {inputStream ->
  45. content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
  46. } as InputStreamCallback)
  47. if (content == "" ) { continue }
  48. def jsonList = jsonSlurper.parseText(content)
  49. for (data in jsonList) {
  50. if ( count % 200 == 0) {
  51. statement = conn.createStatement()
  52. statement.executeUpdate(sql)
  53. // stringBuffer.append(sql)
  54. sql = "INSERT INTO "
  55. }
  56. String tagName = data.tagName
  57. String[] tagNameList = tagName.split("\\.")
  58. String newTagName
  59. if (sTable) {
  60. newTagName = tagName.replace(".","_")
  61. } else {
  62. sTable = AttributeSTable
  63. newTagName = tagName.replace(".","_")
  64. }
  65. String pointTime = data.pointTime
  66. Double valueDouble = data.valueDouble
  67. String valueSql = String.format("%s USING %s TAGS (%s) VALUES ('%s',%s) ", newTagName, sTable, newTagName, pointTime, valueDouble )
  68. sql = sql.concat(valueSql)
  69. count ++
  70. }
  71. }
  72. if ( sql != "INSERT INTO ") {
  73. statement = conn.createStatement()
  74. statement.executeUpdate(sql)
  75. // stringBuffer.append(sql)
  76. }
  77. // close
  78. if ( conn != null ) { conn.close() }
  79. if ( statement != null ) { statement.close() }
  80. //newFlowFile = session.create()
  81. //session.write(newFlowFile, {outputStream ->
  82. // outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))
  83. //} as OutputStreamCallback)
  84. //session.transfer(newFlowFile, REL_SUCCESS)
  85. // flowFile --> success
  86. session.transfer(flowFileList,REL_FAILURE)