1、前置条件:
1.1 集群1有kafka和nifi
1.2 集群2有nifi和TD
2、集群1消费kafka
2.1 添加process group
2.2 进入process group添加consumerkafka和output!
port
2.3 consumerKafka配置如下:
3、集群2消费集群1
3.1 集群2 添加remote process group
3.2 remote process group配置如下:(urls=集群1下nifi地址,多个地址以逗号隔开,传输协议为HTTP)
3.3 添加ExecuteGroovyScript处理器,脚本如下:
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.SQLException
import java.sql.DriverManager
flowFileList=session.get(150)
if (!flowFileList)return
// 声明容器存储结果
// StringBuffer stringBuffer = new StringBuffer()
// 创建方法实现连接TAOS_TDengine
def getConnect(String jdbcUrl)
{
Class.forName("com.taosdata.jdbc.TSDBDriver")
Connection conn = DriverManager.getConnection(jdbcUrl)
if(null == conn)
{
throw new SQLException("数据库创建失败,请检查配置信息")
session.transfer(flowFile, REL_FAILURE)
}
return conn
}
//判断获取到的150个flowFile不为空
if ( ! flowFileList.isEmpty() ) {
// 调用方法注入url获取连接
String url = String.format("jdbc:TAOS://%s/%s?user=root&password=taosdata", IPPort, DBName)
// 这种执行方式需要表存在(先创建表)
String sql = String.format("INSERT INTO ")
Connection conn = getConnect(url)
flowFileList.each { // 遍历
flowFile ->
if (flowFile != null) {
String text = ''
session.read(
flowFile, {
inputStream ->
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
} as InputStreamCallback
)
String[] data = text.split(",")
String[] lists = data[0].split("\\.")
String newTagName=lists[0].concat("_").concat(lists[1])
// 拼接sql语句
String valueSql = String.format("%s.%s VALUES ('%s',%s) ", DBName, newTagName, data[1], data[2] )
sql = sql.concat(valueSql)
}
}
sql = sql.concat(";")
// 执行
ps = conn.prepareStatement(sql)
ps.executeUpdate()
ps.close()
// close
if ( ! conn ) { conn.close() }
if ( ! ps ) { ps.close() }
}
/*
newFlowFile = session.create()
stringBuffer.append(sql)
stringBuffer.append(url)
// 写入newFlowFile中
session.write(
newFlowFile, {
outputStream ->
outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback
)
// newFlowFile --> success
session.transfer(newFlowFile, REL_SUCCESS)
*/
// flowFile --> success
session.transfer(flowFileList,REL_FAILURE)
3.4 详细如下:
4、在nifi上用groovy脚本调用TDjdbc注意事项
4.1 需要把对应jdbc.jar包cp到${NIFI_HOME}/lib下
4.2 libtaos.so文件设置环境变量,如下:
LD_LIBRARY_PATH=/usr/lib
export LD_LIBRARY_PATH