1、参考文献
https://www.taosdata.com/cn/documentation/
https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-1/ta-p/248922
https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-2/ta-p/249018
https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-3/ta-p/249148
一、TD下载安装
参考TAOS官方文档:https://www.taosdata.com/blog/2019/08/09/566.html
1、创建测试超级表,数据库以及插入数据
taos> CREATE DATABASE mhs KEEP 365 DAYS 10 BLOCKS 4 UPDATE 1;
taos> use mhs;
taos> CREATE STABLE mhsfj (point_time timestamp, value_double double) TAGS (tagname binary(64));
taos> INSERT INTO MHSDJL_NX_GD_FG_L1_P1_001_ASF234 USING MHSDJL TAGS ("MHSDJL_NX_GD_FG_L1_P1_001_ASF234") VALUES (now,0.1234);
-------
创建子表
CREATE TABLE IF NOT EXISTS MHSFJ_NX_GD_MHSF_FJ_P2_L4_040_AI0062 USING mhsfj TAGS ("MHSFJ_NX_GD_MHSF_FJ_P2_L4_040_AI0062");
CREATE TABLE IF NOT EXISTS MHSDJL_NX_GD_FG_L1_P1_001_ASF234 USING mhsdjl TAGS ("MHSDJL_NX_GD_FG_L1_P1_001_ASF234");
如下错误:
sudo: /etc/sudoers is world writable
sudo: no valid sudoers sources found, quitting
sudo: unable to initialize policy plugin
解决:
pkexec chmod 555 /etc/sudoers
pkexec chmod 555 /etc/sudoers.d
二、需求:
1、实现nifi上管理读取Taos_TDengine时序数据库
三、实现
1、Nifi使用ExecuteSQL处理器实现查询TAOS_TDengine遇到问题,如下:
ExecuteSQL[id=83ba910e 0179-1000-765e 608004eae304] Unable to execute SQL select query select * from test.meters due to java.sql.sQLFeatureNotSupportedException: this operation is NOT supported currently!. No FlowFile to route to failure: org.apache.nifi.processor.exception.ProcessException: java.sql.SQLFeatureNotSupporedException: this operation is NOT supported currently!
-------------------------------------------------------------------------------------------
ExecuteSQL 无法执行SQL select查询:"select*from test.meters",原因是java.SQL.sQLFeatureNotSupportedException:当前不支持此操作!没有要路由到失败的流文件:org.apache.nifi.processor.exception.ProcessException:java.sql.sqlfeaturenotsupploredexception:当前不支持此操作!
2、Groovy脚本实现查询TAOS_TDengine
- 添加JDBCTaos.jar包到${NIFI_HOME}/lib目录
- 添加libtaos.so环境变量
echo "LD_LIBRARY_PATH=/usr/lib" >> /etc/profile
echo "export LD_LIBRARY_PATH" >> /etc/profile
source /etc/profile
注:添加taosjar到nifilib时,taosjar要从maven下载,且版本跟当前td服务版本一致,不然会出现jni connection is null
- 重启Nifi
- 添加ExecuteGroovyScript处理器
- Groovy脚本写入到ScriptBody一栏,脚本如下: ```groovy // 导入相关类 import java.nio.charset.StandardCharsets import java.sql.Connection import java.sql.PreparedStatement import java.sql.ResultSet import java.sql.SQLException import java.sql.DriverManager import java.sql.PreparedStatement import java.sql.ResultSet import org.apache.commons.io.IOUtils import java.nio.charset.StandardCharsets
// 获取流文件 // flowFile = session.get() // if(!flowFile)return flowFile = session.create()
// 创建方法实现连接TAOS_TDengine def getConnect(String jdbcUrl) { Class.forName(“com.taosdata.jdbc.TSDBDriver”) Connection conn = DriverManager.getConnection(jdbcUrl) if(null == conn) { throw new SQLException(“数据库创建失败,请检查配置信息”) session.transfer(flowFile, REL_FAILURE) } return conn }
// 调用方法注入url获取连接 Connection conn = getConnect(“jdbc:TAOS://10.168.1.16:6030/test?user=root&password=taosdata”) // 处理SQL PreparedStatement ps = conn.prepareStatement(“select * from meters;”) // 执行 ResultSet rs = ps.executeQuery() // 声明容器存储结果 StringBuffer stringBuffer = new StringBuffer() // 遍历结果 while (rs.next()) { stringBuffer.append(rs.getString(1)) stringBuffer.append(“\n”) }
// 写入flowFile中 session.write(flowFile, {outputStream -> outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback)
// flowFile —> success session.transfer(flowFile, REL_SUCCESS)
6. **添加PutFile处理器测试接收数据**
![image.png](https://cdn.nlark.com/yuque/0/2021/png/2680099/1621502947614-f73f8810-85d5-4f1b-871e-7975897592ee.png#clientId=u4cb4bc0c-2e27-4&from=paste&height=451&id=u2e5cdc98&margin=%5Bobject%20Object%5D&name=image.png&originHeight=902&originWidth=1920&originalType=binary&ratio=1&size=181460&status=done&style=none&taskId=u74bc9d03-87ac-447f-beaf-7bb137bf026&width=960)
7. **执行运行**
![image.png](https://cdn.nlark.com/yuque/0/2021/png/2680099/1621503019178-74610113-457e-4680-91dd-06a5d35772d6.png#clientId=u4cb4bc0c-2e27-4&from=paste&height=451&id=u0058db50&margin=%5Bobject%20Object%5D&name=image.png&originHeight=902&originWidth=1920&originalType=binary&ratio=1&size=191763&status=done&style=none&taskId=u9a7195e9-812b-465f-8a43-12699c56dae&width=960)
8. **linux查看文件**
![image.png](https://cdn.nlark.com/yuque/0/2021/png/2680099/1621503157169-28b4d527-79d7-4d25-9ab5-e3c8098e4170.png#clientId=u4cb4bc0c-2e27-4&from=paste&height=151&id=uf7b75624&margin=%5Bobject%20Object%5D&name=image.png&originHeight=302&originWidth=1384&originalType=binary&ratio=1&size=56233&status=done&style=none&taskId=u42fd7ac1-7cf0-4e85-8f1d-07d1ff461b0&width=692)
9. **结果如下**
![image.png](https://cdn.nlark.com/yuque/0/2021/png/2680099/1621503191124-f2bffeec-a5de-4d0c-a1dc-524136cd30b5.png#clientId=u4cb4bc0c-2e27-4&from=paste&height=143&id=uf39ae047&margin=%5Bobject%20Object%5D&name=image.png&originHeight=285&originWidth=1584&originalType=binary&ratio=1&size=63249&status=done&style=none&taskId=u3e9db516-0c29-4190-a89c-5b2f89ab0d7&width=792)
<a name="jxYGZ"></a>
## 四、nifi groovy常用函数
<a name="aHuSD"></a>
#### 1、从会话中获取传入的流文件
```groovy
flowFile = session.get();
if (flowFile != null) {
// All processing code goes here
}
2、从会话中获取多个传入流文件
flowFileList = session.get(100)
if(!flowFileList.isEmpty()){
flowFileList.each {flowFile->
//在这里处理每个FlowFile
}
}
3、创建一个新的FlowFile
flowFile = session.create()
//这里的附加处理flowFile = session.create()
4、从父FlowFile创建新的FlowFile
使用会话对象中的create(parentFlowFile)方法。此方法采用父FlowFile引用并返回新的子FlowFile对象。新创建的FlowFile将继承除UUID之外的所有父属性。此方法将自动生成Provenance FORK事件或Provenance JOIN事件,具体取决于在提交ProcessSession之前是否从同一父级生成其他FlowFiles。
var flowFile = session.get();
if (flowFile != null) {
var newFlowFile = session.create(flowFile);
// Additional processing here
}
5、向流文件添加属性
使用会话对象中的putAttribute(flowFile,attributeKey,attributeValue)方法。此方法使用给定的键/值对更新给定的FlowFile属性。注意:“uuid”属性对于FlowFile是固定的,不能修改; 如果密钥名为“uuid”,则将被忽略。
var flowFile = session.get();
if (flowFile != null) {
flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
}
6、向流文件添加多个属性
使用会话对象中的putAllAttributes(flowFile,attributeMap)方法。此方法使用给定Map中的键/值对更新给定的FlowFile属性。注意:“uuid”属性对于FlowFile是固定的,不能修改; 如果密钥名为“uuid”,则将被忽略。
var number2 = 2;
var attrMap = {'myAttr1':'1', 'myAttr2': number2.toString()}
var flowFile = session.get()
if (flowFile != null) {
flowFile = session.putAllAttributes(flowFile, attrMap)
}
7、从流文件中获取属性
使用FlowFile对象中的getAttribute(attributeKey)方法。此方法返回给定attributeKey的String值,如果未找到attributeKey,则返回null。这些示例显示了“filename”属性值的检索。
var flowFile = session.get()
if (flowFile != null) {
var myAttr = flowFile.getAttribute('filename')
}
var flowFile = session.get()
if (flowFile != null) {
var attrs = flowFile.getAttributes();
for each (var attrKey in attrs.keySet()) {
// Do something with attrKey (the key) and/or attrs[attrKey] (the value)
}
}
8、将流文件传输到关系
使用会话对象中的transfer(flowFile,relationship)方法。从文档中:此方法根据给定的关系将给定的FlowFile传输到适当的目标处理器工作队列。如果关系导致多个目标,则复制FlowFile的状态,使得每个目标都接收FlowFile的精确副本,尽管每个目标都具有其自己的唯一标识。
var flowFile = session.get();
if (flowFile != null) {
// All processing code goes here
if(errorOccurred) {
session.transfer(flowFile, REL_FAILURE)
}
else {
session.transfer(flowFile, REL_SUCCESS)
}
}
9、以指定的日志记录级别向日志发送消息
将log变量与warn(),trace(),debug(),info()或error()方法一起使用。这些方法可以使用单个String,或者后跟对象数组的String,或者后跟对象数组后跟Throwable的String。第一个用于简单消息。当您有一些要记录的动态对象/值时,将使用第二个。要在消息字符串中引用这些,请在消息中使用“{}”。这些是按照外观的顺序针对Object数组进行评估的,因此如果消息显示为“Found these things:{} {} {}”并且Object数组为[‘Hello’,1,true],则记录的消息将为“找到这些东西:你好1真的”。这些日志记录方法的第三种形式也采用Throwable参数。
var ObjectArrayType = Java.type("java.lang.Object[]");
var objArray = new ObjectArrayType(3);
objArray[0] = 'Hello';
objArray[1] = 1;
objArray[2] = true;
log.info('Found these things: {} {} {}', objArray)