0、前置条件
0.1 当前节点安装TDengine,
0.2 taos-jdbcdriver-2.0.18.jar cp 到所有 ${NIFI_HOME}/lib下
0.3 注意jar和TDengine的版本匹配
0.4 添加libtaos.so环境变量
LD_LIBRARY_PATH=/usr/lib
export LD_LIBRARY_PATH
1、Groovy1:
1.1 根据数据库获取db下面的所有超级表,传出(db,超级表名),Groovy脚本如下:
// 导入相关类
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.ResultSet
import java.sql.SQLException
import java.sql.DriverManager
// 获取流文件
// flowFile = session.get()
// if(!flowFile)return
flowFile = session.create()
// 创建方法实现连接TAOS_TDengine
def getConnect(String jdbcUrl)
{
Class.forName("com.taosdata.jdbc.TSDBDriver")
Connection conn = DriverManager.getConnection(jdbcUrl)
if(null == conn)
{
throw new SQLException("数据库创建失败,请检查配置信息")
session.transfer(flowFile, REL_FAILURE)
}
return conn
}
String db = "mhs" //要抽取都库名
// 调用方法注入url获取连接
String url = String.format("jdbc:TAOS://10.168.1.15:6030/%s?user=root&password=taosdata",db)
Connection conn = getConnect(url)
// 处理SQL
PreparedStatement ps = conn.prepareStatement("show stables;")
// 执行
ResultSet rs = ps.executeQuery()
// 声明容器存储结果
StringBuffer stringBuffer = new StringBuffer()
// 遍历结果
while (rs.next()) {
stringBuffer.append(db)
stringBuffer.append(",")
stringBuffer.append(rs.getString(1))
stringBuffer.append("\r\n")
}
// 写入flowFile中
session.write(flowFile, {outputStream ->
outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)
// flowFile --> success
session.transfer(flowFile, REL_SUCCESS)
rs.close()
ps.close()
conn.close()
1.2 failure流输出到/dev/null 或者自动终,如下:
1.3 调大 run schedule,确保只运行一次。
2、splitText
2.1 获取每一行(db,超级表名)
2.2 splits输出到下一个Groovy,failure、oraginal输出到/dev/null 或自动终止,如下:
3、Groovy2:
3.1 获取 (db,超级表名),查询超级表下所有子表,传出 (db,子表名,超级表名),Groovy脚本如下:
// 导入相关类
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.ResultSet
import java.sql.SQLException
import java.sql.DriverManager
// 获取流文件
flowFile = session.get()
if(!flowFile)return
// 创建方法实现连接TAOS_TDengine
def getConnect(String jdbcUrl){
Class.forName("com.taosdata.jdbc.TSDBDriver")
Connection conn = DriverManager.getConnection(jdbcUrl)
if(null == conn){
throw new SQLException("数据库创建失败,请检查配置信息")
session.transfer(flowFile, REL_FAILURE)
}
return conn
}
def text = ''
session.read(flowFile, {
inputStream ->
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
} as InputStreamCallback
)
String[] data = text.split(",")
// 调用方法注入url获取连接
String url = String.format("jdbc:TAOS://10.168.1.15:6030/%s?user=root&password=taosdata",data[0])
Connection conn = getConnect(url)
// 处理SQL
String sql = String.format("\"select tbname from %s.%s\"",data[0],data[1])
PreparedStatement ps = conn.prepareStatement(sql)
// 执行
ResultSet rs = ps.executeQuery()
// 声明容器存储结果
StringBuffer stringBuffer = new StringBuffer()
// 遍历结果
while (rs.next()) {
stringBuffer.append(data[0])
stringBuffer.append(",")
stringBuffer.append(rs.getString(1))
stringBuffer.append(",")
stringBuffer.append(data[1])
stringBuffer.append("\r\n")
}
newFlowFile = session.create()
// 写入flowFile中
session.write(newFlowFile, {outputStream ->
outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)
// flowFile --> success
session.transfer(newFlowFile, REL_SUCCESS)
session.transfer(flowFile,REL_FAILURE)
rs.close()
ps.close()
conn.close()
**3.2 failure流输出到/dev/null 或自动终止,同上。success输出到splitText
4、splitText:
4.1 获取每一行 (db,子表名,超级表名)
4.2 splits输出到下一个Groovy,failure、oraginal输出到/dev/null 或自动终止
5、Groovy3:
5.1 获取 (db,子表名,超级表名) ,查询子表下的数据,传出查询到的数据(格式:”t1,v1;t2,v2;t3,v3;”),同时以属性的形式传出 (db,子表名,超级表名) 。Groovy脚本如下:
// 导入相关类
import java.nio.charset.StandardCharsets
import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.ResultSet
import java.sql.SQLException
import java.sql.DriverManager
// 获取流文件
flowFile = session.get()
if(!flowFile)return
def text = ''
session.read(flowFile, {
inputStream ->
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
} as InputStreamCallback
)
String[] data = text.split(",")
// 创建方法实现连接TAOS_TDengine
def getConnect(String jdbcUrl)
{
Class.forName("com.taosdata.jdbc.TSDBDriver")
Connection conn = DriverManager.getConnection(jdbcUrl)
if(null == conn)
{
throw new SQLException("数据库创建失败,请检查配置信息")
session.transfer(flowFile, REL_FAILURE)
}
return conn
}
// 调用方法注入url获取连接
String url = String.format("jdbc:TAOS://10.168.1.15:6030/%s?user=root&password=taosdata",data[0])
Connection conn = getConnect(url)
// 处理SQL
String sql = String.format("select * from %s.%s;",data[0],data[1])
PreparedStatement ps = conn.prepareStatement(sql)
// 执行
ResultSet rs = ps.executeQuery()
// 声明容器存储结果
StringBuffer stringBuffer = new StringBuffer()
// 遍历结果
while (rs.next()) {
stringBuffer.append(rs.getTimestamp("point_time"))
stringBuffer.append(",")
stringBuffer.append(rs.getDouble("value_double"))
/* stringBuffer.append(",")
stringBuffer.append(rs.getString("tagname"))*/
stringBuffer.append(";")
}
flowFile = session.putAttribute(flowFile, 'DBAndTable', text)
newFlowFile = session.create()
newFlowFile = session.putAttribute(newFlowFile, 'DBAndTable', text)
// 写入flowFile中
session.write(newFlowFile, {outputStream ->
outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)
// newFlowFile --> success
session.transfer(newFlowFile, REL_SUCCESS)
// flowFile --> failure
session.transfer(flowFile, REL_FAILURE)
rs.close()
ps.close()
conn.close()
5.2 success输出到groovy4,failure输出到/dev/null或自动终止,同上。
6、Groovy4:
6.1 获取Groovy3的flowFile,以“;”拆分,获取长度,以190一条数据的格式执行为一条sql,执行sql语句。脚本如下:
// 导入相关类
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.ResultSet
import java.sql.SQLException
import java.sql.DriverManager
// 获取流文件
flowFile = session.get()
if(!flowFile)return
/*
StringBuffer stringBuffer = new StringBuffer()
newFlowFile = session.create()
*/
// 获取数据
def text = ''
session.read(flowFile, {inputStream ->
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
} as InputStreamCallback)
// 拆分数据
String[] data = text.split(";");
// 获取flowFile传入点属性(db.tagname)tagname=子表
String DBAndTable = flowFile.getAttribute('DBAndTable')
// 拆分
String[] dbTb = DBAndTable.split(",")
// 创建方法实现连接TAOS_TDengine
def getConnect(String jdbcUrl)
{
Class.forName("com.taosdata.jdbc.TSDBDriver")
Connection conn = DriverManager.getConnection(jdbcUrl)
if(null == conn)
{
throw new SQLException("数据库创建失败,请检查配置信息")
session.transfer(flowFile, REL_FAILURE)
}
return conn
}
// 调用方法注入url获取连接
String url = String.format("jdbc:TAOS://10.168.1.15:6030/%s?user=root&password=taosdata", dbTb[0])
Connection conn = getConnect(url)
// 获取数据长度
int length = data.length;
// 这种插入方式当表不存在时创建表
String sql = String.format("INSERT INTO %s USING %s TAGS ('%s') VALUES ", dbTb[1], dbTb[2], dbTb[1]);
// 子表都存在时可以这样插入
// String sql = String.format("INSERT INTO %s VALUES ",sonTabNam);
if ( length > 0 ) { // 判断长度不等于0
// 分批执行
Integer startNumber = 0;
Integer sendNumber = 0;
Integer ln = 190;
// 判断需要执行多少批
Integer count = length/ln;
if (length==0) {
return
}
// 当length % 总长度==0时,count+1
if ( length % ln == 0) {
// 循环count次
for (Integer i = 1; i < count+1; i++) {
startNumber = sendNumber
sendNumber=i*ln
for (Integer j = startNumber; j < sendNumber; j++) {
String line = data[j];
String[] str = line.split(",");
// 处理SQL
String value = String.format("('%s',%s),",str[0],str[1]);
sql = sql.concat(value);
}
// 删除最后一个字符
sql = sql.substring(0, sql.length() - 1);
// 结尾
sql = sql.concat(";");
/*
stringBuffer.append(sql)
// 写入flowFile中
session.write(newFlowFile, {outputStream ->
outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)
*/
// 执行
ps = conn.prepareStatement(sql)
ps.executeUpdate()
}
} else { // 当length % 总长度!=0时,count+2
for (Integer i = 1; i < count+2; i++) {
startNumber = sendNumber;
sendNumber=i*ln;
if (i==count+1) {
sendNumber=length;
}
for (Integer j = startNumber; j < sendNumber; j++) {
String line = data[j];
String[] str = line.split(",");
// 处理SQL
String value = String.format("('%s',%s),",str[0], "0.02");
sql = sql.concat(value);
}
// 删除最后一个字符
sql = sql.substring(0, sql.length() - 1);
// 结尾
sql = sql.concat(";");
/*
stringBuffer.append(sql)
// 写入flowFile中
session.write(newFlowFile, {outputStream ->
outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)
*/
// 执行
ps = conn.prepareStatement(sql)
ps.executeUpdate()
}
}
if (ps !=null) {
ps.close()
}
conn.close()
}
// flowFile --> success
session.transfer(flowFile, REL_FAILURE)
/*
// newLlowFile --> success
session.transfer(newFlowFile, REL_SUCCESS)
*/
6.2 success、failure输出到/dev/null或自动终止,同上。
7、总览:
8、夸集群参见Kafka -> Nifi ->TD
8.1 1~5步在集群1实现,第6步在集群2实现