1、前置条件:

1.1 集群1有kafka和nifi

1.2 集群2有nifi和TD

2、集群1消费kafka

2.1 添加process group

image.png

2.2 进入process group添加consumerkafka和output!

port

image.png

2.3 consumerKafka配置如下:

image.png

3、集群2消费集群1

3.1 集群2 添加remote process group

image.png
3.2 remote process group配置如下:(urls=集群1下nifi地址,多个地址以逗号隔开,传输协议为HTTP)
image.png

3.3 添加ExecuteGroovyScript处理器,脚本如下:

  1. import groovy.json.JsonSlurper
  2. import java.nio.charset.StandardCharsets
  3. import org.apache.commons.io.IOUtils
  4. import java.sql.Connection
  5. import java.sql.PreparedStatement
  6. import java.sql.SQLException
  7. import java.sql.DriverManager
  8. flowFileList=session.get(150)
  9. if (!flowFileList)return
  10. // 声明容器存储结果
  11. // StringBuffer stringBuffer = new StringBuffer()
  12. // 创建方法实现连接TAOS_TDengine
  13. def getConnect(String jdbcUrl)
  14. {
  15. Class.forName("com.taosdata.jdbc.TSDBDriver")
  16. Connection conn = DriverManager.getConnection(jdbcUrl)
  17. if(null == conn)
  18. {
  19. throw new SQLException("数据库创建失败,请检查配置信息")
  20. session.transfer(flowFile, REL_FAILURE)
  21. }
  22. return conn
  23. }
  24. //判断获取到的150个flowFile不为空
  25. if ( ! flowFileList.isEmpty() ) {
  26. // 调用方法注入url获取连接
  27. String url = String.format("jdbc:TAOS://%s/%s?user=root&password=taosdata", IPPort, DBName)
  28. // 这种执行方式需要表存在(先创建表)
  29. String sql = String.format("INSERT INTO ")
  30. Connection conn = getConnect(url)
  31. flowFileList.each { // 遍历
  32. flowFile ->
  33. if (flowFile != null) {
  34. String text = ''
  35. session.read(
  36. flowFile, {
  37. inputStream ->
  38. text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
  39. } as InputStreamCallback
  40. )
  41. String[] data = text.split(",")
  42. String[] lists = data[0].split("\\.")
  43. String newTagName=lists[0].concat("_").concat(lists[1])
  44. // 拼接sql语句
  45. String valueSql = String.format("%s.%s VALUES ('%s',%s) ", DBName, newTagName, data[1], data[2] )
  46. sql = sql.concat(valueSql)
  47. }
  48. }
  49. sql = sql.concat(";")
  50. // 执行
  51. ps = conn.prepareStatement(sql)
  52. ps.executeUpdate()
  53. ps.close()
  54. // close
  55. if ( ! conn ) { conn.close() }
  56. if ( ! ps ) { ps.close() }
  57. }
  58. /*
  59. newFlowFile = session.create()
  60. stringBuffer.append(sql)
  61. stringBuffer.append(url)
  62. // 写入newFlowFile中
  63. session.write(
  64. newFlowFile, {
  65. outputStream ->
  66. outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))
  67. } as OutputStreamCallback
  68. )
  69. // newFlowFile --> success
  70. session.transfer(newFlowFile, REL_SUCCESS)
  71. */
  72. // flowFile --> success
  73. session.transfer(flowFileList,REL_FAILURE)

3.4 详细如下:

image.png

4、总揽
QQ20210608-0.png

4、在nifi上用groovy脚本调用TDjdbc注意事项

4.1 需要把对应jdbc.jar包cp到${NIFI_HOME}/lib下

4.2 libtaos.so文件设置环境变量,如下:

  1. LD_LIBRARY_PATH=/usr/lib
  2. export LD_LIBRARY_PATH