1、参考文献
https://www.taosdata.com/cn/documentation/
https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-1/ta-p/248922
https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-2/ta-p/249018
https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-3/ta-p/249148

一、TD下载安装

参考TAOS官方文档:https://www.taosdata.com/blog/2019/08/09/566.html

1、创建测试超级表,数据库以及插入数据

  1. taos> CREATE DATABASE mhs KEEP 365 DAYS 10 BLOCKS 4 UPDATE 1;
  2. taos> use mhs;
  3. taos> CREATE STABLE mhsfj (point_time timestamp, value_double double) TAGS (tagname binary(64));
  4. taos> INSERT INTO MHSDJL_NX_GD_FG_L1_P1_001_ASF234 USING MHSDJL TAGS ("MHSDJL_NX_GD_FG_L1_P1_001_ASF234") VALUES (now,0.1234);
  5. -------
  6. 创建子表
  7. CREATE TABLE IF NOT EXISTS MHSFJ_NX_GD_MHSF_FJ_P2_L4_040_AI0062 USING mhsfj TAGS ("MHSFJ_NX_GD_MHSF_FJ_P2_L4_040_AI0062");
  8. CREATE TABLE IF NOT EXISTS MHSDJL_NX_GD_FG_L1_P1_001_ASF234 USING mhsdjl TAGS ("MHSDJL_NX_GD_FG_L1_P1_001_ASF234");

如下错误:

  1. sudo: /etc/sudoers is world writable
  2. sudo: no valid sudoers sources found, quitting
  3. sudo: unable to initialize policy plugin

解决:

  1. pkexec chmod 555 /etc/sudoers
  2. pkexec chmod 555 /etc/sudoers.d

二、需求:

1、实现nifi上管理读取Taos_TDengine时序数据库

三、实现

1、Nifi使用ExecuteSQL处理器实现查询TAOS_TDengine遇到问题,如下:

QNI@@1I{}WI%E9UD2RAUVJK.png

  1. ExecuteSQL[id=83ba910e 0179-1000-765e 608004eae304] Unable to execute SQL select query select * from test.meters due to java.sql.sQLFeatureNotSupportedException: this operation is NOT supported currently!. No FlowFile to route to failure: org.apache.nifi.processor.exception.ProcessException: java.sql.SQLFeatureNotSupporedException: this operation is NOT supported currently!
  2. -------------------------------------------------------------------------------------------
  3. ExecuteSQL 无法执行SQL select查询:"select*from test.meters",原因是java.SQL.sQLFeatureNotSupportedException:当前不支持此操作!没有要路由到失败的流文件:org.apache.nifi.processor.exception.ProcessException:java.sql.sqlfeaturenotsupploredexception:当前不支持此操作!

2、Groovy脚本实现查询TAOS_TDengine


  1. 添加JDBCTaos.jar包到${NIFI_HOME}/lib目录

image.png

  1. 添加libtaos.so环境变量
    1. echo "LD_LIBRARY_PATH=/usr/lib" >> /etc/profile
    2. echo "export LD_LIBRARY_PATH" >> /etc/profile
    3. source /etc/profile

注:添加taosjar到nifilib时,taosjar要从maven下载,且版本跟当前td服务版本一致,不然会出现jni connection is null

  1. 重启Nifi
  2. 添加ExecuteGroovyScript处理器

image.png

  1. Groovy脚本写入到ScriptBody一栏,脚本如下: ```groovy // 导入相关类 import java.nio.charset.StandardCharsets import java.sql.Connection import java.sql.PreparedStatement import java.sql.ResultSet import java.sql.SQLException import java.sql.DriverManager import java.sql.PreparedStatement import java.sql.ResultSet import org.apache.commons.io.IOUtils import java.nio.charset.StandardCharsets

// 获取流文件 // flowFile = session.get() // if(!flowFile)return flowFile = session.create()

// 创建方法实现连接TAOS_TDengine def getConnect(String jdbcUrl) { Class.forName(“com.taosdata.jdbc.TSDBDriver”) Connection conn = DriverManager.getConnection(jdbcUrl) if(null == conn) { throw new SQLException(“数据库创建失败,请检查配置信息”) session.transfer(flowFile, REL_FAILURE) } return conn }

// 调用方法注入url获取连接 Connection conn = getConnect(“jdbc:TAOS://10.168.1.16:6030/test?user=root&password=taosdata”) // 处理SQL PreparedStatement ps = conn.prepareStatement(“select * from meters;”) // 执行 ResultSet rs = ps.executeQuery() // 声明容器存储结果 StringBuffer stringBuffer = new StringBuffer() // 遍历结果 while (rs.next()) { stringBuffer.append(rs.getString(1)) stringBuffer.append(“\n”) }

// 写入flowFile中 session.write(flowFile, {outputStream -> outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback)

// flowFile —> success session.transfer(flowFile, REL_SUCCESS)

  1. 6. **添加PutFile处理器测试接收数据**
  2. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/2680099/1621502947614-f73f8810-85d5-4f1b-871e-7975897592ee.png#clientId=u4cb4bc0c-2e27-4&from=paste&height=451&id=u2e5cdc98&margin=%5Bobject%20Object%5D&name=image.png&originHeight=902&originWidth=1920&originalType=binary&ratio=1&size=181460&status=done&style=none&taskId=u74bc9d03-87ac-447f-beaf-7bb137bf026&width=960)
  3. 7. **执行运行**
  4. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/2680099/1621503019178-74610113-457e-4680-91dd-06a5d35772d6.png#clientId=u4cb4bc0c-2e27-4&from=paste&height=451&id=u0058db50&margin=%5Bobject%20Object%5D&name=image.png&originHeight=902&originWidth=1920&originalType=binary&ratio=1&size=191763&status=done&style=none&taskId=u9a7195e9-812b-465f-8a43-12699c56dae&width=960)
  5. 8. **linux查看文件**
  6. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/2680099/1621503157169-28b4d527-79d7-4d25-9ab5-e3c8098e4170.png#clientId=u4cb4bc0c-2e27-4&from=paste&height=151&id=uf7b75624&margin=%5Bobject%20Object%5D&name=image.png&originHeight=302&originWidth=1384&originalType=binary&ratio=1&size=56233&status=done&style=none&taskId=u42fd7ac1-7cf0-4e85-8f1d-07d1ff461b0&width=692)
  7. 9. **结果如下**
  8. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/2680099/1621503191124-f2bffeec-a5de-4d0c-a1dc-524136cd30b5.png#clientId=u4cb4bc0c-2e27-4&from=paste&height=143&id=uf39ae047&margin=%5Bobject%20Object%5D&name=image.png&originHeight=285&originWidth=1584&originalType=binary&ratio=1&size=63249&status=done&style=none&taskId=u3e9db516-0c29-4190-a89c-5b2f89ab0d7&width=792)
  9. <a name="jxYGZ"></a>
  10. ## 四、nifi groovy常用函数
  11. <a name="aHuSD"></a>
  12. #### 1、从会话中获取传入的流文件
  13. ```groovy
  14. flowFile = session.get();
  15. if (flowFile != null) {
  16. // All processing code goes here
  17. }

2、从会话中获取多个传入流文件

  1. flowFileList = session.get100
  2. if(!flowFileList.isEmpty()){
  3. flowFileList.each {flowFile->
  4. //在这里处理每个FlowFile
  5. }
  6. }

3、创建一个新的FlowFile

  1. flowFile = session.create()
  2. //这里的附加处理flowFile = session.create()

4、从父FlowFile创建新的FlowFile

使用会话对象中的create(parentFlowFile)方法。此方法采用父FlowFile引用并返回新的子FlowFile对象。新创建的FlowFile将继承除UUID之外的所有父属性。此方法将自动生成Provenance FORK事件或Provenance JOIN事件,具体取决于在提交ProcessSession之前是否从同一父级生成其他FlowFiles。

  1. var flowFile = session.get();
  2. if (flowFile != null) {
  3. var newFlowFile = session.create(flowFile);
  4. // Additional processing here
  5. }

5、向流文件添加属性

使用会话对象中的putAttribute(flowFile,attributeKey,attributeValue)方法。此方法使用给定的键/值对更新给定的FlowFile属性。注意:“uuid”属性对于FlowFile是固定的,不能修改; 如果密钥名为“uuid”,则将被忽略。

  1. var flowFile = session.get();
  2. if (flowFile != null) {
  3. flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
  4. }

6、向流文件添加多个属性

使用会话对象中的putAllAttributes(flowFile,attributeMap)方法。此方法使用给定Map中的键/值对更新给定的FlowFile属性。注意:“uuid”属性对于FlowFile是固定的,不能修改; 如果密钥名为“uuid”,则将被忽略。

  1. var number2 = 2;
  2. var attrMap = {'myAttr1':'1', 'myAttr2': number2.toString()}
  3. var flowFile = session.get()
  4. if (flowFile != null) {
  5. flowFile = session.putAllAttributes(flowFile, attrMap)
  6. }

7、从流文件中获取属性

使用FlowFile对象中的getAttribute(attributeKey)方法。此方法返回给定attributeKey的String值,如果未找到attributeKey,则返回null。这些示例显示了“filename”属性值的检索。

  1. var flowFile = session.get()
  2. if (flowFile != null) {
  3. var myAttr = flowFile.getAttribute('filename')
  4. }
  5. var flowFile = session.get()
  6. if (flowFile != null) {
  7. var attrs = flowFile.getAttributes();
  8. for each (var attrKey in attrs.keySet()) {
  9. // Do something with attrKey (the key) and/or attrs[attrKey] (the value)
  10. }
  11. }

8、将流文件传输到关系

使用会话对象中的transfer(flowFile,relationship)方法。从文档中:此方法根据给定的关系将给定的FlowFile传输到适当的目标处理器工作队列。如果关系导致多个目标,则复制FlowFile的状态,使得每个目标都接收FlowFile的精确副本,尽管每个目标都具有其自己的唯一标识。

  1. var flowFile = session.get();
  2. if (flowFile != null) {
  3. // All processing code goes here
  4. if(errorOccurred) {
  5. session.transfer(flowFile, REL_FAILURE)
  6. }
  7. else {
  8. session.transfer(flowFile, REL_SUCCESS)
  9. }
  10. }

9、以指定的日志记录级别向日志发送消息

将log变量与warn(),trace(),debug(),info()或error()方法一起使用。这些方法可以使用单个String,或者后跟对象数组的String,或者后跟对象数组后跟Throwable的String。第一个用于简单消息。当您有一些要记录的动态对象/值时,将使用第二个。要在消息字符串中引用这些,请在消息中使用“{}”。这些是按照外观的顺序针对Object数组进行评估的,因此如果消息显示为“Found these things:{} {} {}”并且Object数组为[‘Hello’,1,true],则记录的消息将为“找到这些东西:你好1真的”。这些日志记录方法的第三种形式也采用Throwable参数。

  1. var ObjectArrayType = Java.type("java.lang.Object[]");
  2. var objArray = new ObjectArrayType(3);
  3. objArray[0] = 'Hello';
  4. objArray[1] = 1;
  5. objArray[2] = true;
  6. log.info('Found these things: {} {} {}', objArray)