0、前置条件

0.1 当前节点安装TDengine,
0.2 taos-jdbcdriver-2.0.18.jar cp 到所有 ${NIFI_HOME}/lib下
0.3 注意jar和TDengine的版本匹配
image.png
0.4 添加libtaos.so环境变量

  1. LD_LIBRARY_PATH=/usr/lib
  2. export LD_LIBRARY_PATH

0.5 重启nifi


1、Groovy1:

1.1 根据数据库获取db下面的所有超级表,传出(db,超级表名),Groovy脚本如下:

  1. // 导入相关类
  2. import org.apache.commons.io.IOUtils
  3. import java.nio.charset.StandardCharsets
  4. import java.sql.Connection
  5. import java.sql.PreparedStatement
  6. import java.sql.ResultSet
  7. import java.sql.SQLException
  8. import java.sql.DriverManager
  9. // 获取流文件
  10. // flowFile = session.get()
  11. // if(!flowFile)return
  12. flowFile = session.create()
  13. // 创建方法实现连接TAOS_TDengine
  14. def getConnect(String jdbcUrl)
  15. {
  16. Class.forName("com.taosdata.jdbc.TSDBDriver")
  17. Connection conn = DriverManager.getConnection(jdbcUrl)
  18. if(null == conn)
  19. {
  20. throw new SQLException("数据库创建失败,请检查配置信息")
  21. session.transfer(flowFile, REL_FAILURE)
  22. }
  23. return conn
  24. }
  25. String db = "mhs" //要抽取都库名
  26. // 调用方法注入url获取连接
  27. String url = String.format("jdbc:TAOS://10.168.1.15:6030/%s?user=root&password=taosdata",db)
  28. Connection conn = getConnect(url)
  29. // 处理SQL
  30. PreparedStatement ps = conn.prepareStatement("show stables;")
  31. // 执行
  32. ResultSet rs = ps.executeQuery()
  33. // 声明容器存储结果
  34. StringBuffer stringBuffer = new StringBuffer()
  35. // 遍历结果
  36. while (rs.next()) {
  37. stringBuffer.append(db)
  38. stringBuffer.append(",")
  39. stringBuffer.append(rs.getString(1))
  40. stringBuffer.append("\r\n")
  41. }
  42. // 写入flowFile中
  43. session.write(flowFile, {outputStream ->
  44. outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))
  45. } as OutputStreamCallback)
  46. // flowFile --> success
  47. session.transfer(flowFile, REL_SUCCESS)
  48. rs.close()
  49. ps.close()
  50. conn.close()

image.png
1.2 failure流输出到/dev/null 或者自动终,如下:
image.png
1.3 调大 run schedule,确保只运行一次。

image.png
2、splitText

2.1 获取每一行(db,超级表名)
image.png
2.2 splits输出到下一个Groovy,failure、oraginal输出到/dev/null 或自动终止,如下:
image.png

3、Groovy2:

3.1 获取 (db,超级表名),查询超级表下所有子表,传出 (db,子表名,超级表名),Groovy脚本如下:

  1. // 导入相关类
  2. import org.apache.commons.io.IOUtils
  3. import java.nio.charset.StandardCharsets
  4. import java.sql.Connection
  5. import java.sql.PreparedStatement
  6. import java.sql.ResultSet
  7. import java.sql.SQLException
  8. import java.sql.DriverManager
  9. // 获取流文件
  10. flowFile = session.get()
  11. if(!flowFile)return
  12. // 创建方法实现连接TAOS_TDengine
  13. def getConnect(String jdbcUrl){
  14. Class.forName("com.taosdata.jdbc.TSDBDriver")
  15. Connection conn = DriverManager.getConnection(jdbcUrl)
  16. if(null == conn){
  17. throw new SQLException("数据库创建失败,请检查配置信息")
  18. session.transfer(flowFile, REL_FAILURE)
  19. }
  20. return conn
  21. }
  22. def text = ''
  23. session.read(flowFile, {
  24. inputStream ->
  25. text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
  26. } as InputStreamCallback
  27. )
  28. String[] data = text.split(",")
  29. // 调用方法注入url获取连接
  30. String url = String.format("jdbc:TAOS://10.168.1.15:6030/%s?user=root&password=taosdata",data[0])
  31. Connection conn = getConnect(url)
  32. // 处理SQL
  33. String sql = String.format("\"select tbname from %s.%s\"",data[0],data[1])
  34. PreparedStatement ps = conn.prepareStatement(sql)
  35. // 执行
  36. ResultSet rs = ps.executeQuery()
  37. // 声明容器存储结果
  38. StringBuffer stringBuffer = new StringBuffer()
  39. // 遍历结果
  40. while (rs.next()) {
  41. stringBuffer.append(data[0])
  42. stringBuffer.append(",")
  43. stringBuffer.append(rs.getString(1))
  44. stringBuffer.append(",")
  45. stringBuffer.append(data[1])
  46. stringBuffer.append("\r\n")
  47. }
  48. newFlowFile = session.create()
  49. // 写入flowFile中
  50. session.write(newFlowFile, {outputStream ->
  51. outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))
  52. } as OutputStreamCallback)
  53. // flowFile --> success
  54. session.transfer(newFlowFile, REL_SUCCESS)
  55. session.transfer(flowFile,REL_FAILURE)
  56. rs.close()
  57. ps.close()
  58. conn.close()

**3.2 failure流输出到/dev/null 或自动终止,同上。success输出到splitText

**

4、splitText:

4.1 获取每一行 (db,子表名,超级表名)
4.2 splits输出到下一个Groovy,failure、oraginal输出到/dev/null 或自动终止

5、Groovy3:

5.1 获取 (db,子表名,超级表名) ,查询子表下的数据,传出查询到的数据(格式:”t1,v1;t2,v2;t3,v3;”),同时以属性的形式传出 (db,子表名,超级表名) 。Groovy脚本如下:

  1. // 导入相关类
  2. import java.nio.charset.StandardCharsets
  3. import java.sql.Connection
  4. import java.sql.PreparedStatement
  5. import java.sql.ResultSet
  6. import java.sql.SQLException
  7. import java.sql.DriverManager
  8. // 获取流文件
  9. flowFile = session.get()
  10. if(!flowFile)return
  11. def text = ''
  12. session.read(flowFile, {
  13. inputStream ->
  14. text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
  15. } as InputStreamCallback
  16. )
  17. String[] data = text.split(",")
  18. // 创建方法实现连接TAOS_TDengine
  19. def getConnect(String jdbcUrl)
  20. {
  21. Class.forName("com.taosdata.jdbc.TSDBDriver")
  22. Connection conn = DriverManager.getConnection(jdbcUrl)
  23. if(null == conn)
  24. {
  25. throw new SQLException("数据库创建失败,请检查配置信息")
  26. session.transfer(flowFile, REL_FAILURE)
  27. }
  28. return conn
  29. }
  30. // 调用方法注入url获取连接
  31. String url = String.format("jdbc:TAOS://10.168.1.15:6030/%s?user=root&password=taosdata",data[0])
  32. Connection conn = getConnect(url)
  33. // 处理SQL
  34. String sql = String.format("select * from %s.%s;",data[0],data[1])
  35. PreparedStatement ps = conn.prepareStatement(sql)
  36. // 执行
  37. ResultSet rs = ps.executeQuery()
  38. // 声明容器存储结果
  39. StringBuffer stringBuffer = new StringBuffer()
  40. // 遍历结果
  41. while (rs.next()) {
  42. stringBuffer.append(rs.getTimestamp("point_time"))
  43. stringBuffer.append(",")
  44. stringBuffer.append(rs.getDouble("value_double"))
  45. /* stringBuffer.append(",")
  46. stringBuffer.append(rs.getString("tagname"))*/
  47. stringBuffer.append(";")
  48. }
  49. flowFile = session.putAttribute(flowFile, 'DBAndTable', text)
  50. newFlowFile = session.create()
  51. newFlowFile = session.putAttribute(newFlowFile, 'DBAndTable', text)
  52. // 写入flowFile中
  53. session.write(newFlowFile, {outputStream ->
  54. outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))
  55. } as OutputStreamCallback)
  56. // newFlowFile --> success
  57. session.transfer(newFlowFile, REL_SUCCESS)
  58. // flowFile --> failure
  59. session.transfer(flowFile, REL_FAILURE)
  60. rs.close()
  61. ps.close()
  62. conn.close()

5.2 success输出到groovy4,failure输出到/dev/null或自动终止,同上。

6、Groovy4:

6.1 获取Groovy3的flowFile,以“;”拆分,获取长度,以190一条数据的格式执行为一条sql,执行sql语句。脚本如下:

  1. // 导入相关类
  2. import org.apache.commons.io.IOUtils
  3. import java.nio.charset.StandardCharsets
  4. import java.sql.Connection
  5. import java.sql.PreparedStatement
  6. import java.sql.ResultSet
  7. import java.sql.SQLException
  8. import java.sql.DriverManager
  9. // 获取流文件
  10. flowFile = session.get()
  11. if(!flowFile)return
  12. /*
  13. StringBuffer stringBuffer = new StringBuffer()
  14. newFlowFile = session.create()
  15. */
  16. // 获取数据
  17. def text = ''
  18. session.read(flowFile, {inputStream ->
  19. text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
  20. } as InputStreamCallback)
  21. // 拆分数据
  22. String[] data = text.split(";");
  23. // 获取flowFile传入点属性(db.tagname)tagname=子表
  24. String DBAndTable = flowFile.getAttribute('DBAndTable')
  25. // 拆分
  26. String[] dbTb = DBAndTable.split(",")
  27. // 创建方法实现连接TAOS_TDengine
  28. def getConnect(String jdbcUrl)
  29. {
  30. Class.forName("com.taosdata.jdbc.TSDBDriver")
  31. Connection conn = DriverManager.getConnection(jdbcUrl)
  32. if(null == conn)
  33. {
  34. throw new SQLException("数据库创建失败,请检查配置信息")
  35. session.transfer(flowFile, REL_FAILURE)
  36. }
  37. return conn
  38. }
  39. // 调用方法注入url获取连接
  40. String url = String.format("jdbc:TAOS://10.168.1.15:6030/%s?user=root&password=taosdata", dbTb[0])
  41. Connection conn = getConnect(url)
  42. // 获取数据长度
  43. int length = data.length;
  44. // 这种插入方式当表不存在时创建表
  45. String sql = String.format("INSERT INTO %s USING %s TAGS ('%s') VALUES ", dbTb[1], dbTb[2], dbTb[1]);
  46. // 子表都存在时可以这样插入
  47. // String sql = String.format("INSERT INTO %s VALUES ",sonTabNam);
  48. if ( length > 0 ) { // 判断长度不等于0
  49. // 分批执行
  50. Integer startNumber = 0;
  51. Integer sendNumber = 0;
  52. Integer ln = 190;
  53. // 判断需要执行多少批
  54. Integer count = length/ln;
  55. if (length==0) {
  56. return
  57. }
  58. // 当length % 总长度==0时,count+1
  59. if ( length % ln == 0) {
  60. // 循环count次
  61. for (Integer i = 1; i < count+1; i++) {
  62. startNumber = sendNumber
  63. sendNumber=i*ln
  64. for (Integer j = startNumber; j < sendNumber; j++) {
  65. String line = data[j];
  66. String[] str = line.split(",");
  67. // 处理SQL
  68. String value = String.format("('%s',%s),",str[0],str[1]);
  69. sql = sql.concat(value);
  70. }
  71. // 删除最后一个字符
  72. sql = sql.substring(0, sql.length() - 1);
  73. // 结尾
  74. sql = sql.concat(";");
  75. /*
  76. stringBuffer.append(sql)
  77. // 写入flowFile中
  78. session.write(newFlowFile, {outputStream ->
  79. outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))
  80. } as OutputStreamCallback)
  81. */
  82. // 执行
  83. ps = conn.prepareStatement(sql)
  84. ps.executeUpdate()
  85. }
  86. } else { // 当length % 总长度!=0时,count+2
  87. for (Integer i = 1; i < count+2; i++) {
  88. startNumber = sendNumber;
  89. sendNumber=i*ln;
  90. if (i==count+1) {
  91. sendNumber=length;
  92. }
  93. for (Integer j = startNumber; j < sendNumber; j++) {
  94. String line = data[j];
  95. String[] str = line.split(",");
  96. // 处理SQL
  97. String value = String.format("('%s',%s),",str[0], "0.02");
  98. sql = sql.concat(value);
  99. }
  100. // 删除最后一个字符
  101. sql = sql.substring(0, sql.length() - 1);
  102. // 结尾
  103. sql = sql.concat(";");
  104. /*
  105. stringBuffer.append(sql)
  106. // 写入flowFile中
  107. session.write(newFlowFile, {outputStream ->
  108. outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))
  109. } as OutputStreamCallback)
  110. */
  111. // 执行
  112. ps = conn.prepareStatement(sql)
  113. ps.executeUpdate()
  114. }
  115. }
  116. if (ps !=null) {
  117. ps.close()
  118. }
  119. conn.close()
  120. }
  121. // flowFile --> success
  122. session.transfer(flowFile, REL_FAILURE)
  123. /*
  124. // newLlowFile --> success
  125. session.transfer(newFlowFile, REL_SUCCESS)
  126. */

6.2 success、failure输出到/dev/null或自动终止,同上。

7、总览:

image.png

8、夸集群参见Kafka -> Nifi ->TD

8.1 1~5步在集群1实现,第6步在集群2实现