一、介绍

本开发人员指南的目的是为读者提供了解Apache NiFi扩展的开发方式所需的信息,并帮助他们解释开发组件背后的思考过程。它提供了对用于开发扩展的API的介绍和说明。但是,由于本指南旨在补充API的JavaDoc,而不是替代它们,因此并未详细介绍API中的每种方法。本指南还假定读者熟悉Java 7和Apache Maven。
本指南由开发人员为开发人员编写。希望在阅读本指南之前,您对NiFi和数据流的概念有基本的了解。如果没有,请参阅《NiFi概述》 和《NiFi用户指南》以熟悉NiFi的概念。

二、NiFi组件

NiFi提供了几个扩展点,使开发人员能够向应用程序添加功能以满足他们的需求。以下列表对最常见的扩展点进行了高级描述:

1、Processor

Processor接口是一种机制,NiFi通过该机制公开对FlowFile,其属性和内容的访问。处理器是用于构成NiFi数据流的基本构建块。此接口用于完成以下所有任务:

  1. Create FlowFiles(创建FlowFiles)
  2. Read FlowFile content(读取FlowFile内容)
  3. Write FlowFile content(编写FlowFile内容)
  4. Read FlowFile attributes(读取FlowFile属性)
  5. Update FlowFile attributes(更新FlowFile属性)
  6. Ingest data(提取数据)
  7. Egress data(出口数据)
  8. Route data(路由数据)
  9. Extract data(提取数据)
  10. Modify data(修改数据)

2、ReportingTask

Nifi公开ReportingTask接口是一种机制,它允许将度量标准,监视信息和内部NiFi状态发布到外部端点,例如日志文件,电子邮件和远程Web服务。

3、ControllerService

ControllerService在单个JVM中跨处理器,其他ControllerService和ReportingTask提供共享状态和功能。一个示例用例可能包括将非常大的数据集加载到内存中。通过在ControllerService中执行此工作,数据可以一次加载并通过此服务公开给所有处理器,而不需要许多不同的处理器自己加载数据集。

4、FlowFilePrioritizer

FlowFilePrioritizer接口提供了一种机制,通过该机制,可以对队列中的FlowFile进行优先级排序或排序,以便可以按对特定用例最有效的顺序处理FlowFiles。

5、AuthorityProvider

AuthorityProvider负责确定应授予给定用户哪些特权和角色(如果有)。

三、Processor API

Process是NiFi中使用最广泛的组件。Process是唯一有权访问以创建,删除,修改或检查FlowFiles(数据和属性)的组件。
所有处理器都使用Java的ServiceLoader机制加载和实例化。这意味着所有处理器都必须遵守以下规则:

  • 处理器必须具有默认构造函数。
  • 处理器的JAR文件必须在META-INF / services目录中包含名为的条目 org.apache.nifi.processor.Processor。这是一个文本文件,其中的每一行都包含处理器的完全限定的类名。

虽然Processor是可以直接实现的接口,但是这样做org.apache.nifi.processor.AbstractProcessor几乎是很少见的,因为几乎所有处理器实现都是该类的基类。本AbstractProcessor类提供的功能的显著量,这使得开发的处理器更容易,更方便的任务。对于本文档,我们将主要关注AbstractProcessor处理Processor API的类。
#并发注释
NiFi是一个高度并发的框架。这意味着所有扩展都必须是线程安全的。如果不熟悉用Java编写并发软件,强烈建议您熟悉Java并发原理。

1、Supporting API

为了理解Processor API,我们必须首先(至少在较高层次上)理解几个支持的类和接口,下面将对其进行讨论。

1.1 FlowFile

FlowFile是一种逻辑概念,它使一条数据与关于该数据的一组属性相关联。这些属性包括FlowFile的唯一标识符,其名称,大小以及任何其他特定于流的值。虽然FlowFile的内容和属性可以更改,但FlowFile对象是不可变的。通过ProcessSession可以对FlowFile进行修改。
FlowFiles的核心属性在org.apache.nifi.flowfile.attributes.CoreAttributes枚举中定义。你会看到最常用的属性是filename,path和uuid。括号中的字符串是CoreAttributes枚举中属性的值以及它在UI / API中的显示方式。

  • Filename(filename):FlowFile的文件名。文件名不应包含任何目录结构。
  • UUID(uuid):分配给此FlowFile的通用唯一标识符,用于将FlowFile与系统中的其他FlowFile区分开。
  • Path(path):FlowFile的路径指示FlowFile所属的相对目录,并且不包含文件名。
  • Absolute Path(absolute.path):FlowFile的绝对路径指示FlowFile所属的绝对目录,并且不包含文件名。
  • Priority(priority):指示FlowFile优先级的数值。
  • MIME Type(mime.type):此FlowFile的MIME类型。
  • Discard Reason(discard.reason):指定丢弃FlowFile的原因。
  • Alternate Identifier(alternate.identifier):表示除FlowFile的UUID之外的已知标识符,该标识符可引用该FlowFile。

其他常见属性
尽管这些属性不是CoreAttributes枚举的成员,但它们实际上是整个系统的标准,并且可以在大多数FlowFiles上找到。

  • File Size(fileSize):FlowFile内容的大小(以字节为单位)。
  • Entry Date(entryDate):FlowFile进入系统(即已创建)的日期和时间。此属性的值是一个数字,表示自1970年1月1日午夜(UTC)以来的毫秒数。
  • Lineage Start Date(lineageStartDate):每当克隆,合并或拆分FlowFile时,都会导致创建“子” FlowFile。然后,在克隆,合并或拆分这些子代时,便构建了一个祖先链。此值表示最早的祖先进入系统的日期和时间。考虑这一点的另一种方法是,该属性表示FlowFile通过系统的延迟。该值是一个代表自1970年1月1日午夜(UTC)以来的毫秒数。

    1.2 ProcessSession

    ProcessSession,通常简称为“session”,提供了一种机制,通过它可以创建,销毁,检查,克隆和转移FlowFiles并将其转移到其他处理器。此外,ProcessSession提供了一种机制,用于通过添加或删除属性或修改FlowFile的内容来创建修改版本的FlowFiles。ProcessSession还公开了一种发出来源事件的机制,该机制提供了跟踪FlowFile的血统和历史的功能。在一个或多个FlowFiles上执行操作后,可以提交或回滚ProcessSession。

    1.3 ProcessContext

    ProcessContext提供了处理器与框架之间的桥梁。它提供有关处理器当前配置方式的信息,并允许处理器执行特定于框架的任务,例如产生其资源,以便框架调度其他处理器运行,而不必消耗不必要的资源。
    属性描述符
    PropertyDescriptor定义将由Processor,ReportingTask或ControllerService使用的属性。属性的定义包括其名称,属性说明,可选的默认值,验证逻辑以及关于是否需要该属性才能使Processor有效的指示符。通过实例化该类的实例PropertyDescriptor.Builder ,调用适当的方法以填充有关属性的详细信息并最终调用该build方法来创建PropertyDescriptor 。

    1.4 Validatioon

    PropertyDescriptor必须指定一个或多个验证器,这些验证器可用于确保用户输入的属性值有效。如果验证器指示属性值无效,则在该属性变为有效之前,组件将无法运行或使用。如果未指定验证器,则该组件将被视为无效,并且NiFi将报告该属性不受支持。

    1.5 ValidatioonContext

    验证属性值时,可以使用ValidationContext获取ControllerServices,创建PropertyValue对象以及使用表达式语言编译和评估属性值。

    1.6 PropertyValue

    返回给Processor的所有属性值都以PropertyValue对象的形式返回。该对象具有方便的方法,可以将值从字符串转换为其他形式,例如数字和时间段,以及提供用于评估表达式语言的API。

    1.7 Relationship

    Relationship定义了FlowFile可能从处理器传输到的路由。通过实例化该类的实例Relationship.Builder ,调用适当的方法以填充Relationship的详细信息,最后调用该build方法,可以创建关系 。
    1.8 StateManager
    StateManager为处理器,报告任务和控制器服务提供了一种易于存储和检索状态的机制。该API与ConcurrentHashMap相似,但是每个操作都需要一个Scope。范围指示状态是要在本地还是在整个集群范围内检索/存储。有关更多信息,请参见状态管理器部分。

    1.8 ProcessorInitializationContext

    创建处理器后,initialize将使用InitializationContext对象调用其方法。该对象向处理器公开在处理器的整个生命周期内都不会改变的配置,例如处理器的唯一标识符。

    1.9 ComponentLog

    建议Processors通过ComponentLog接口执行其日志记录 ,而不是获取第三方记录器的直接实例。这是因为通过ComponentLog进行的日志记录允许框架将超出可配置严重性级别的日志消息呈现给用户界面,从而允许在发生重要事件时通知监视数据流的人员。此外,它通过在调试模式下记录堆栈跟踪并在日志消息中提供处理器的唯一标识符,为所有处理器提供一致的日志记录格式。

    2、AbstractProcessor API

    由于绝大多数处理器将通过扩展AbstractProcessor来创建,因此我们将在本节中研究它是抽象类。AbstractProcessor提供了处理器开发人员感兴趣的几种方法。

    2.1 Processor Initialization

    创建处理器时,在调用任何其他方法之前,init将调用AbstractProcessor的 方法。该方法采用单个参数,类型为 ProcessorInitializationContext。上下文对象为处理器提供了ComponentLog,处理器的唯一标识符和ControllerServiceLookup,可用于与已配置的ControllerServices进行交互。每个这样的对象是由AbstractProcessor存储,并且可以由子类经由获得getLogger,getIdentifier和 getControllerServiceLookup方法,分别。

    2.2 Exposing Processor’s Relationship

    为了使处理器能够将FlowFile传输到新的目的地进行后续处理,处理器必须首先能够向框架公开其当前支持的所有关系。这允许应用程序的用户通过在处理器之间创建连接并为这些连接分配适当的关系来将处理器彼此连接。
    处理器通过覆盖getRelationships方法公开有效的关系集 。这个方法没有参数,并返回Set的Relationship 对象。对于大多数处理器,此Set将是静态的,但是其他处理器将根据用户配置动态生成Set。对于那些Set是静态的Processor,建议在Processor的构造函数或init方法中创建一个不可变的Set并返回该值,而不是动态生成Set。这种模式使其更干净的代码和更好的性能。

    2.3 Exposing Processor Properties

    大多数处理器在使用前都需要一定数量的用户配置。处理器支持的属性通过getSupportedPropertyDescriptors方法公开给框架 。这个方法没有参数,并返回List的 PropertyDescriptor对象。列表中对象的顺序很重要,因为它决定了将在用户界面中呈现属性的顺序。
    PropertyDescriptor目的是通过创建一个新的实例构造PropertyDescriptor.Builder对象,调用构建器的适当的方法,并最终调用build方法。
    尽管此方法涵盖了大多数用例,但有时还是希望允许用户配置名称未知的其他属性。这可以通过重写getSupportedDynamicPropertyDescriptor方法来实现 。此方法以String作为其唯一参数,该参数指示属性的名称。该方法返回一个PropertyDescriptor对象,该 对象可用于验证属性名称和值。从此方法返回的任何PropertyDescriptor都应构建isDynamic,并将PropertyDescriptor.Builder类中的value设置为true 。AbstractProcessor的默认行为是不允许任何动态创建的属性。

    2.4 Validating Processor Properties()

    如果处理器的配置无效,则无法启动它。可以通过在PropertyDescriptor上设置Validator或通过PropertyDescriptor.Builder的allowableValues方法来限制属性的允许值来实现对Processor属性的验证identifiesControllerService。
    此外,如果一个属性依赖于另一个属性(通过PropertyDescriptor.Builder的’dependsOn方法),并且不满足该依赖性,则将验证该属性。
    例如,考虑以下两个属性描述符: ```java PropertyDescriptor USE_FILE = new PropertyDescriptor.Buildler() .name(“Use File”) .displayName(“Use File”) .required(true) .allowableValues(“true”, “false”) .defaultValue(“true”) .build();

PropertyDescriptor FILE = new PropertyDescriptor.Builder() .name(“File to Use”) .displayName(“File to Use”) .required(true) .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) .dependsOn(USE_FILE, “true”) .build();

  1. 在这种情况下,如果“Use File”属性设置为true,则处理器将无效,除非“File to Use”属性设置为有效的文件名。如果“Use File”设置为true,并且“File to Use”未设置值,则处理器将无效(因为需要“File to Use”属性)。如果“Use File”设置为true,并且“File to Use”设置了值,但指定的文件不存在,则处理器也将无效,因为根据验证程序,“File to Use”属性无效。<br />但是,如果“Use File”属性设置为false,则“File to Use”属性具有不满足的依赖关系。因此,验证中将不考虑“File to Use”属性。因此,如果“Use File”设置为false,并且“File to Use”没有表示值,则处理器仍然有效(即使需要“File to Use”,但仅当“Use File”为true时才需要)。同样,如果将“File to Use”设置为不存在的文件名,例如/File/that/does/not/exist,则只要“Use File”属性设置为false,处理器仍然有效。<br />此外,“File to Use”属性甚至不会显示在NiFi UI中,除非“Use File”设置为true。<br />然而,有时仅验证处理器的属性还不够。为此,AbstractProcessor公开了一个customValidate方法。该方法采用type的单个参数ValidationContext。此方法的返回值是一个Collection ValidationResult描述验证过程中发现的任何问题的对象。仅应返回isValid方法返回的那些ValidationResult对象 false。仅当所有属性均根据其关联的“验证器”和“允许值”有效时,才会调用此方法。即,仅当所有属性本身都有效时才调用此方法,并且此方法允许对处理器的配置进行整体验证。
  2. <a name="wHEdu"></a>
  3. #### 2.5 Responding to Changes in Configuration(响应配置更改)
  4. 有时需要让处理器在其属性更改时迅速做出反应。该onPropertyModified 方法允许处理器做到这一点。当用户更改处理器的属性值时,onPropertyModified将为每个修改后的属性调用该 方法。该方法采用三个参数:PropertyDescriptor,它指示修改了哪个属性,旧值和新值。如果该属性没有先前的值,则第二个参数为null。如果删除了该属性,则第三个参数为null。重要的是要注意,无论值是否有效,都将调用此方法。仅在实际修改值时才调用此方法,而不是在用户更新Processor而不更改其值时调用此方法。在调用此方法时,可以确保调用此方法的线程是处理器中当前正在执行代码的唯一线程,除非处理器本身创建了自己的线程。
  5. <a name="HhROp"></a>
  6. #### 2.6 Performing the Work
  7. 当处理器有工作要做时,安排它onTrigger通过框架调用其方法来进行。该方法有两个参数:aProcessContexta ProcessSession。该onTrigger方法的第一步通常是通过getProcessSession上调用其中一个方法来获取要在其上执行工作的FlowFile 。对于从外部来源将数据导入NiFi的处理器,将跳过此步骤。然后,处理器可以自由检查FlowFile属性。添加,删除或修改属性;读取或修改FlowFile内容;并将FlowFiles传输到适当的Relationships
  8. <a name="V2Vhc"></a>
  9. #### 2.7 When Processors are Triggere
  10. onTrigger仅当计划运行处理器并且该处理器存在工作时,才会调用处理器的方法。如果满足以下任一条件,则据说存在处理器的工作:
  11. - 以处理器为目的地的连接在其队列中至少有一个FlowFile
  12. - 处理器没有传入的连接
  13. - 处理器带有@TriggerWhenEmpty批注
  14. 存在多个因素,这些因素将在onTrigger调用Processor 方法时起作用。首先,除非用户将处理器配置为运行,否则不会触发处理器。如果安排处理器运行,则框架会定期(该时间段由用户在用户界面中配置)检查处理器是否有工作要做,如上所述。如果是这样,框架将检查处理器的下游目标。如果处理器的任何出站连接已满,则默认情况下不会安排处理器运行。<br />但是,@TriggerWhenAnyDestinationAvailable可以将注释添加到处理器的类。在这种情况下,将更改需求,以使只有一个下游目标必须“可用”(如果连接队列未满,则将目标视为“可用”),而不是要求所有下游目标均可用。<br />@TriggerSerially 注释也与处理器调度有关。使用此注释的处理器永远不会有多个线程onTrigger同时运行该方法。但是,至关重要的是要注意,执行代码的线程可能会在调用之间变化。因此,仍必须注意确保处理器是线程安全的!
  15. <a name="Mf5QX"></a>
  16. ### 3、Component Lifecycle(组件生命周期)
  17. NiFi API通过使用Java注释提供生命周期支持。该org.apache.nifi.annotation.lifecycle软件包包含一些用于生命周期管理的注释。以下注释可以应用于NiFi组件中的Java方法,以指示框架何时应调用这些方法。为了讨论组件生命周期,我们将NiFi组件定义为ProcessorControllerServicesReportingTask
  18. <a name="lj0Xk"></a>
  19. #### 3.1 @OnAdded
  20. @OnAdded注解导致要尽快一个组件被创建调用的方法。构造组件后,将调用组件的initialize方法(或init方法,如果是子类,则 该方法AbstractProcessor),然后是用注释的方法@OnAdded。如果任何带有方法注释的方法@OnAdded引发Exception,则将向用户返回错误,并且该组件不会添加到流程中。此外,将不会调用带有此注释的其他方法。在组件的整个生命周期中,只会调用一次此方法。具有此注释的方法必须采用零参数。
  21. <a name="kFM4L"></a>
  22. #### 3.2 @OnEnabled
  23. 所述@OnEnabled注释可以被用来指示每当使能控制器服务的方法应该被调用。每次用户启用服务时,将调用具有此批注的任何方法。另外,每次重新启动NiFi时,如果将NiFi配置为“自动恢复状态”并且启用了服务,则将调用该方法。<br />如果带有此注释的方法抛出Throwable,将为该组件发出日志消息和公告。在这种情况下,服务将保持“启用”状态,并且将无法使用。带有此注释的所有方法将在延迟后再次被调用。在返回带有该批注的所有方法而不会抛出任何结果之前,该服务将不可用。<br />使用此批注的方法必须采用0个参数或一个type类型的参数org.apache.nifi.controller.ConfigurationContext。<br />**注意**:如果将其应用于ReportingTaskProcessor,则将忽略该注释。对于Controller Service,启用和禁用被视为生命周期事件,因为该操作使它们可以被其他组件使用或无法使用。但是,对于处理器和报告任务,这些不是生命周期事件,而是一种在启动或停止一组组件时允许排除某个组件的机制。
  24. <a name="dN5qt"></a>
  25. #### 3.3 @OnRemoved
  26. @OnRemoved注释将使得前一组分从流中除去要被调用的方法。这样可以在删除组件之前清理资源。具有此批注的方法必须采用零参数。如果带有此批注的方法抛出Exception,则该组件仍将被删除。
  27. <a name="UeFgI"></a>
  28. #### 3.4 @OnScheduled
  29. 此注释指示每次计划运行组件时都应调用一个方法。由于未计划ControllerServices,因此**在ControllerService上使用此批注没有意义**,也将不被接受。仅应将其用于处理器和报告任务。如果具有此注释的任何方法都引发Exception,则不会调用具有该注释的其他方法,并且将向用户显示通知。在这种情况下, @OnUnscheduled将触发使用@OnStopped注释的方法,然后触发带有注释的方法 (在此状态期间,如果这些方法中的任何一个抛出Exception,则将忽略这些Exception)。然后,该组件将在一段时间内产生其执行,称为“管理收益持续时间”nifi.properties文件。最后,该过程将再次开始,直到所有带有注释的方法 @OnScheduled都已返回而没有引发任何Exception。具有此批注的方法可以采用零参数,也可以采用单个参数。如果使用单个参数变体,则ProcessContext如果组件是ProcessorConfigurationContext组件是ReportingTask ,则参数必须是类型。
  30. <a name="luM12"></a>
  31. #### 3.5 @OnUnscheduled
  32. 每当不再计划运行ProcessorReportingTask时,将调用带有此批注的方法。那时,处理器的onTrigger方法中可能仍有许多线程处于活动状态。如果此类方法引发Exception,则将生成日志消息,否则将忽略Exception,并且仍将调用带有此批注的其他方法。具有此批注的方法可以采用零参数,也可以采用单个参数。如果使用单个参数变体,则ProcessContext如果组件是Processor ConfigurationContext组件是ReportingTask ,则参数必须是类型
  33. <a name="WNjQz"></a>
  34. #### 3.6 @OnStopped
  35. 当不再调度ProcessorReportingTask并已从该onTrigger方法返回所有线程时,将调用带有此注释的方法。如果此类方法引发Exception,则会生成一条日志消息,否则将忽略Exception;否则,将忽略该异常。具有此注释的其他方法仍将被调用。带有此批注的方法允许使用01参数。如果使用了参数,则如果组件是ReportingTask,则它必须是ConfigurationContext类型;如果组件是Processor,则它必须是ProcessContext类型。
  36. <a name="Ljlew"></a>
  37. #### 3.7 @OnShutdown
  38. @OnShutdown成功关闭NiFi后,将调用带有注释的任何方法。如果此类方法引发Exception,则将生成日志消息,否则将忽略Exception,并且仍将调用带有此批注的其他方法。具有此批注的方法必须采用零参数。注意:尽管NiFi会尝试在使用该注释的所有组件上调用带有此注释的方法,但这并不总是可能的。例如,该进程可能被意外终止,在这种情况下,它没有机会调用这些方法。因此,例如,尽管使用此批注的方法可用于清理资源,但不应将其用作关键业务逻辑。
  39. <a name="tJbWf"></a>
  40. ### 4、Component Notification(组件通知)
  41. NiFi API通过使用Java注释提供通知支持。该org.apache.nifi.annotation.notification软件包包含一些用于通知管理的注释。以下注释可以应用于NiFi组件中的Java方法,以指示框架何时应调用这些方法。对于组件通知的讨论,我们将NiFi组件定义为处理器,控制器服务或报告任务。
  42. <a name="S5Azp"></a>
  43. #### 4.1 @OnPrimaryNodeStateChange
  44. @OnPrimaryNodeStateChange注解导致要只要主节点的集群中的状态发生了变化调用的方法。具有此批注的方法应该不带参数,也不能带一个type类型的参数PrimaryNodeState。该PrimaryNodeState提供什么改变,使得组件可以采取适当的行动内容。该PrimaryNodeState枚举有两个可能的值: ELECTED_PRIMARY_NODE(节点接收到这个状态已经当选NiFi集群的主节点),或 PRIMARY_NODE_REVOKED(接收到这个状态的节点是主节点,但现在已经有其主节点角色撤销)。
  45. <a name="nmMEE"></a>
  46. ### 5、Restricted(受限制的)
  47. Restricted组件是一种组件,可以用于执行操作员通过NiFi REST API / UI提供的任意未经消毒的代码,也可以用于使用NiFi OS凭据在NiFi主机系统上获取或更改数据。这些组件可能会由其他经过授权的NiFi用户使用,以超出应用程序的预期用途,提升特权,或者可能公开有关NiFi进程或主机系统内部的数据。所有这些功能都应被视为特权,管理员应意识到这些功能,并为一部分受信任的用户显式启用它们。<br />可以使用@Restricted批注标记处理器,控制器服务或报告任务。这将导致该组件被视为受限组件,并且需要将用户明确添加到可以访问受限组件的用户列表中。一旦允许用户访问受限制的组件,则将在允许所有其他权限的情况下允许他们创建和修改那些组件。如果无法访问受限制的组件,则用户仍将知道存在这些类型的组件,但是即使拥有足够的权限也将无法创建或修改它们。
  48. <a name="YUkUW"></a>
  49. ### 6、State Manager(状态管理)
  50. 组件可以从ProcessContextReportingContextControllerServiceInitializationContext调用该getStateManager()方法。该状态管理器负责提供用于存储和检索状态的简单API。该机制旨在为开发人员提供非常容易地存储一组键/值对,检索这些值并自动更新它们的能力。状态可以存储在节点本地或群集中所有节点之间。重要的是要注意,但是,该机制仅旨在提供一种用于存储非常“简单”状态的机制。因此,API仅允许Map<String, String>进行存储和检索,并以原子方式替换整个地图。此外,ZooKeeper支持当前支持存储集群范围状态的唯一实现。这样,整个状态图在序列化后必须小于1 MB。尝试存储更多内容将导致引发异常。如果处理器为管理状态所需的交互比这更复杂(例如,必须存储和检索大量数据,或者必须分别存储和提取单个密钥),则应使用不同的机制(例如,与之通信)外部数据库)。
  51. <a name="qmG9m"></a>
  52. ### 7、Scope(范围)
  53. 与状态管理器进行通信时,所有方法调用都要求提供范围。此范围将为Scope.LOCALScope.CLUSTER。如果NiFi在群集中运行,则此作用域向框架提供有关如何进行操作的重要信息。<br />如果使用来存储状态Scope.CLUSTER,那么集群中的所有节点都将使用相同的状态存储机制进行通信。如果使用状态存储和检索状态Scope.LOCAL,则每个节点将看到状态的不同表示形式。<br />值得注意的是,如果将NiFi配置为作为独立实例运行,而不是在群集中运行,Scope.LOCAL则始终使用范围。这样做是为了允许NiFi组件的开发人员以一种一致的方式编写代码,而不必担心NiFi实例是否集群。开发人员应改为假定实例是集群的,并相应地编写代码。
  54. <a name="z0lx8"></a>
  55. ### 8、Storing and Retrieving State(存储和检索状态)
  56. 状态使用StateManagergetStatesetStatereplaceclear方法存储。所有这些方法都需要提供一个范围。应该注意的是,使用本地作用域存储的状态与使用集群作用域存储的状态完全不同。如果处理器使用作用域中的“我的键”的键存储值。然后尝试使用范围检索值。本地范围,检索到的值将为null(除非使用scope.CLUSTER范围使用相同的键存储了值)。每个处理器的状态与其他处理器的状态隔离存储。<br />因此,两个处理器不能共享相同的状态。然而,在某些情况下,非常有必要在不同类型的两个处理器或相同类型的两个处理器之间共享状态。这可以通过使用控制器服务来实现。通过从控制器服务中存储和检索状态,多个处理器可以使用相同的控制器服务,并且可以通过控制器服务的API公开状态。
  57. <a name="jhajQ"></a>
  58. ### 9、Unit Tests(单元测试)
  59. NiFiMock框架提供了广泛的工具集,可以执行处理器的单元测试。处理器单元测试通常从TestRunner类开始。结果,TestRunner该类包含了getStateManager自己的方法。但是,返回的StateManager具有特定类型:MockStateManager。除了StateManager接口定义的方法外,此实现还提供了几种方法,可帮助开发人员更轻松地开发单元测试。<br />首先,MockStateManager实现StateManager接口,因此可以从单元测试中检查所有状态。此外,MockStateManager公开了一些assert*方法来执行声明状态已按预期设置的断言。该MockStateManager还提供了指示:如果状态为特定更新单元测试应该立即失败的能力 Scope
  60. <a name="aSOgy"></a>
  61. ### 10、Reporting Processor Activity(报告处理器活动)
  62. 处理器负责报告其活动,以便用户能够了解其数据发生了什么。处理器应通过ComponentLog记录事件,该组件可通过InitializationContext或调用的getLogger方法进行访问AbstractProcessor。<br />此外,处理器应使用ProvenanceReporter 通过ProcessSession的接口获得的接口。 getProvenanceReporter方法。ProvenanceReporter应该用于指示从外部源接收内容或将其发送到外部位置的任何时间。ProvenanceReporter还具有用于报告何时克隆,创建或修改FlowFile,以及何时将多个FlowFile合并到单个FlowFile中以及将FlowFile与某些其他标识符关联的方法。但是,这些功能对于报告来说并不是很关键,因为该框架能够检测到这些东西并代表处理器发出适当的事件。但是,对于处理器开发人员来说,发出这些事件是一种最佳实践,因为在代码中明确指出了这些事件正在被发出,并且开发人员能够为事件提供其他详细信息,例如时间的长短。所采取的措施或有关所采取措施的相关信息。如果处理器发出事件,则框架不会发出重复事件。取而代之的是,它始终假定Processor开发人员比Framework更好地了解Processor上下文中发生的事情。但是,框架可能会发出不同的事件。例如,如果处理器同时修改FlowFile的内容及其属性,然后仅发出ATTRIBUTES_MODIFIED事件,则框架将发出CONTENT_MODIFIED事件。如果为该FlowFile发出了任何其他事件(由Processor或框架),则该框架不会发出ATTRIBUTES_MODIFIED事件。这是由于所有 它总是假设Processor开发人员比框架更好地了解Processor上下文中发生的事情。但是,框架可能会发出不同的事件。例如,如果处理器同时修改FlowFile的内容及其属性,然后仅发出ATTRIBUTES_MODIFIED事件,则框架将发出CONTENT_MODIFIED事件。如果为该FlowFile发出了任何其他事件(由Processor或框架),则该框架不会发出ATTRIBUTES_MODIFIED事件。这是由于所有 它总是假设Processor开发人员比框架更好地了解Processor上下文中发生的事情。但是,框架可能会发出不同的事件。例如,如果处理器同时修改FlowFile的内容及其属性,然后仅发出ATTRIBUTES_MODIFIED事件,则框架将发出CONTENT_MODIFIED事件。如果为该FlowFile发出了任何其他事件(由Processor或框架),则该框架不会发出ATTRIBUTES_MODIFIED事件。这是由于所有 如果为该FlowFile发出了任何其他事件(由Processor或框架),则该框架不会发出ATTRIBUTES_MODIFIED事件。这是由于所有 如果为该FlowFile发出了任何其他事件(由Processor或框架),则该框架不会发出ATTRIBUTES_MODIFIED事件。这是由于所有 [来源事件](http://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#provenance_events)知道事件发生之前FlowFile的属性以及由于对该FlowFile的处理而发生的那些属性,因此ATTRIBUTES_MODIFIED通常被认为是多余的,并会导致FlowFile世系的呈现非常冗长。但是,如果从处理器的角度考虑该事件是相关的,则处理器也可以与其他事件一起发出此事件。
  63. <a name="vEQlp"></a>
  64. ## 四、Documenting a Component
  65. NiFi试图通过NiFi应用程序本身通过用户界面向用户提供大量文档,从而使用户体验尽可能简单和方便。为了做到这一点,处理器开发人员当然必须将该文档提供给框架。NiFi公开了几种用于向框架提供文档的机制。
  66. <a name="Znmjz"></a>
  67. ### 1、Documenting Properties(记录属性)
  68. 可以通过调用description PropertyDescriptor的构建器的方法来记录单个属性,如下所示:
  69. ```java
  70. public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor.Builder()
  71. .name("My Property")
  72. .description("Description of the Property")
  73. ...
  74. .build();

如果该属性提供一组允许值,则这些值将在UI的下拉字段中显示给用户。这些值中的每一个也可以给出描述:

  1. public static final AllowableValue EXTENSIVE = new AllowableValue("Extensive", "Extensive",
  2. "Everything will be logged - use with caution!");
  3. public static final AllowableValue VERBOSE = new AllowableValue("Verbose", "Verbose",
  4. "Quite a bit of logging will occur");
  5. public static final AllowableValue REGULAR = new AllowableValue("Regular", "Regular",
  6. "Typical logging will occur");
  7. public static final PropertyDescriptor LOG_LEVEL = new PropertyDescriptor.Builder()
  8. .name("Amount to Log")
  9. .description("How much the Processor should log")
  10. .allowableValues(REGULAR, VERBOSE, EXTENSIVE)
  11. .defaultValue(REGULAR.getValue())
  12. ...
  13. .build();

2、Documenting Relationships(记录关系)

记录处理器关系的方式与属性的记录方式几乎相同-通过调用description关系的构建器的方法:

  1. public static final Relationship MY_RELATIONSHIP = new Relationship.Builder()
  2. .name("My Relationship")
  3. .description("This relationship is used only if the Processor fails to process the data.")
  4. .build();

3、Documenting Capability and Keywords(记录能力和关键字)

该org.apache.nifi.annotation.documentation软件包提供了Java注释,可用于记录组件。可以将CapabilityDescription批注添加到“处理器”,“报告任务”或“控制器服务”中,并旨在提供对该组件提供的功能的简要说明。标签注释具有一个value定义为字符串数组的变量。这样,它通过提供多个值作为带有花括号的字符串的逗号分隔列表来使用。然后,通过允许用户基于标签(即关键字)过滤组件,将这些值合并到UI中。此外,UI提供了标签云,允许用户选择他们要过滤的标签。云中最大的标签是在该NiFi实例中组件上最多的那些标签。下面提供了使用这些注释的示例:

  1. @Tags({"example", "documentation", "developer guide", "processor", "tags"})
  2. @CapabilityDescription("Example Processor that provides no real functionality but is provided" +
  3. " for an example in the Developer Guide")
  4. public static final ExampleProcessor extends Processor {
  5. ...
  6. }

4、Documenting FlowFile Attribute Interaction(记录FlowFile属性交互)

很多时候,处理器会期望在入站FlowFiles上设置某些FlowFile属性,以使处理器正常运行。在其他情况下,处理器可以在出站FlowFile上更新或创建FlowFile属性。处理器开发人员可以使用ReadsAttribute和WritesAttribute文档注释来记录这两种行为。这些属性用于生成文档,使用户可以更好地了解处理器如何与流进行交互。
注意:因为Java 7不支持在类型上重复的注释,所以您可能需要使用ReadsAttributes和WritesAttributes来指示处理器读取或写入多个FlowFile属性。此注释只能应用于处理器。下面列出了一个示例:

  1. @WritesAttributes({ @WritesAttribute(attribute = "invokehttp.status.code", description = "The status code that is returned"),
  2. @WritesAttribute(attribute = "invokehttp.status.message", description = "The status message that is returned"),
  3. @WritesAttribute(attribute = "invokehttp.response.body", description = "The response body"),
  4. @WritesAttribute(attribute = "invokehttp.request.url", description = "The request URL"),
  5. @WritesAttribute(attribute = "invokehttp.tx.id", description = "The transaction ID that is returned after reading the response"),
  6. @WritesAttribute(attribute = "invokehttp.remote.dn", description = "The DN of the remote server") })
  7. public final class InvokeHTTP extends AbstractProcessor {

5、Documenting Related Components(记录相关组件)

通常,处理器和控制器服务相互关联。有时它是一个PUT / GET关系中PutFile和GetFile。有时,处理器使用Controller服务,例如InvokeHTTP和StandardSSLContextService。有时,一个ControllerService使用另一个像DistributedMapCacheClientService和DistributedMapCacheServer。这些扩展点的开发人员可以使用SeeAlso标签将这些不同的组件相关联。该注释链接了文档中的这些组件。 SeeAlso可以应用于Processors,ControllerServices和ReportingTasks。下面列出了如何执行此操作的示例:

  1. @SeeAlso(GetFile.class)
  2. public class PutFile extends AbstractProcessor {

6、Advanced Documentation(进阶文件)

当上述文档编制方法不够用时,NiFi可以通过“Usage”文档向用户公开更高级的文档。当用户右键单击处理器时,NiFi会在上下文菜单中提供“Usage”菜单项。此外,UI在右上角显示“Help”链接,从中可以找到相同的“用法”信息。
处理器的高级文档以名为的HTML文件形式提供additionalDetails.html。该文件应存在于名称为Processor的标准名称的目录中,并且该目录的父目录应命名为 docs并且存在于处理器jar的根目录中。该文件将从生成的HTML文件链接,该文件将包含所有Capability,Keyword,PropertyDescription和Relationship信息,因此无需重复该文件。这是一个提供有关此处理器正在执行的操作,其预期和产生的数据类型以及其预期和产生的FlowFile属性的丰富解释的地方。因为此文档为HTML格式,所以您可能包含图像和表格以最好地描述此组件。可以使用相同的方法来提供有关Processors,ControllerServices和ReportingTasks的高级文档。

五、Provenance Events

来源报告的不同事件类型是:

Provenance Event Description
ADDINFO 指示出处事件,用于添加其他信息,例如将新链接链接到新的URI或UUID
ATTRIBUTES_MODIFIED 指示以某种方式修改了FlowFile的属性。同时报告另一个事件时,不需要此事件,因为另一个事件已经包含所有FlowFile属性
CLONE 指示FlowFile是其父FlowFile的完全重复。
CONTENT_MODIFIED 指示以某种方式修改了FlowFile的内容。使用此事件类型时,建议提供有关如何修改内容的详细信息
CREATE 表示从远程系统或外部进程未接收到的数据生成了FlowFile
DOWNLOAD 指示FlowFile的内容是由用户或外部实体下载的
DROP 表示出于某种原因(不是对象过期)而结束对象生命的起源事件
EXPIRE 指示由于未及时处理对象而结束对象生命的起源事件
FETCH 指示使用某些外部资源的内容覆盖了FlowFile的内容。这类似于RECEIVE事件,但不同之处在于,RECEIVE事件旨在用作将FlowFile引入系统的事件,而FETCH用于指示现有FlowFile的内容已被覆盖
FORK 指示一个或多个FlowFiles是从父FlowFile派生的
JOIN 指示单个FlowFile是从多个父FlowFiles连接在一起而派生的
RECEIVE 指示从外部流程接收数据的出处事件。预期此事件类型是FlowFile的第一个事件。因此,从外部源接收数据并使用该数据替换现有FlowFile内容的处理器应使用FETCH事件类型,而不是RECEIVE事件类型。
REMOTE_INVOCATION 指示已请求对外部端点的远程调用(例如,删除远程资源)。外部端点可能存在于远程或本地系统中,但在NiFi外部
REPLAY 指示用于重播FlowFile的出处事件。事件的UUID表示正在重播的原始FlowFile的UUID。该事件包含一个父UUID(也是正在重播的FlowFile的UUID)和一个子UUID(一个新创建的FlowFile的UUID),该UUID将重新排队以进行处理
ROUTE 表示将FlowFile路由到指定的关系,并提供有关为何将FlowFile路由到此关系的信息
SEND 指示将数据发送到外部流程的出处事件
UNKNOWN 表示来源事件的类型未知,因为尝试访问该事件的用户无权知道该类型

六、Common Processor Patterns(通用处理器模式)

NiFi用户可以使用许多不同的处理器,但其中绝大多数都属于几种常见的设计模式之一。下面,我们讨论这些模式,适当的模式,遵循这些模式的原因以及应用这些模式时需要注意的事项。请注意,下面讨论的模式和建议是一般准则,而不是严格的规则。

1、Data Ingress(数据入口)

将数据提取到NiFi中的处理器具有一个名为的关系success。该处理器通过ProcessSessioncreate方法生成新的FlowFiles,并且不从传入的Connections中提取FlowFiles。处理器名称以“ Get”或“ Listen”开头,具体取决于它轮询外部源还是公开一些外部源可以连接到的接口。名称以用于通信的协议结尾。遵循这种模式的处理器包括GetFile,GetSFTP, ListenHTTP,和GetHTTP。
该处理器可以使用@OnScheduled注释的方法创建或初始化连接池 。但是,由于通信问题可能会阻止建立连接或导致连接终止,因此此时不会创建连接本身。而是在该onTrigger方法中从池中创建或租用连接。
该onTrigger处理器的方法开始于在可能的情况下从连接池中租借连接,否则创建与外部服务的连接。当没有来自外部源的可用数据yield时,ProcessContext的方法将由Processor调用,并且该方法将返回,以便该Processor避免持续运行和耗尽资源而无益。否则,该处理器然后通过ProcessSession的create 方法创建FlowFile,并为FlowFile分配适当的文件名和路径(通过添加filename和path 属性),以及其他任何适当的属性。FlowFile内容的OutputStream是通过ProcessSession的write方法,传递一个新的OutputStreamCallback(通常是一个匿名内部类)。在此回调中,处理器能够写入FlowFile,并将内容从外部资源流式传输到FlowFile的OutputStream。如果希望将InputStream的全部内容写入FlowFile,则importFromProcessSession方法可能比该write方法更方便使用 。
当此处理器希望接收许多小文件时,建议在提交会话之前从单个会话创建多个FlowFiles。通常,这使框架可以更有效地处理新创建的FlowFiles的内容。
该处理器生成一个Provenance事件,指示它已接收到数据,并指定数据来自何处。该处理器应记录FlowFile的创建,以便在必要时可以通过分析日志来确定FlowFile的来源。
该处理器确认收到数据和/或从外部源中删除数据,以防止收到重复文件。仅在提交了创建FlowFile的ProcessSession之后才执行此操作!不遵守该原则可能会导致数据丢失,因为在提交会话之前重新启动NiFi将导致删除临时文件。但是请注意,可以使用这种方法来接收重复的数据,因为在提交会话之后以及在从外部源确认或删除数据之前,应用程序可以重新启动。但是,总的来说,潜在数据重复优先于潜在数据丢失。该连接最终被返回或添加到连接池中,具体取决于该连接是从连接池中租借的开始还是在onTrigger方法中创建的。
如果出现通信问题,则连接通常会终止并且不会返回(或添加)到连接池中。断开与远程系统的连接,并使用带有@OnStopped注释的方法关闭连接池,以便可以回收资源。

2、Data Egress(数据出口)

将数据发布到外部源的处理器具有两个关系:success和failure。处理器名称以“ Put”开头,后跟用于数据传输的协议。遵循此模式的处理器包括PutEmail,PutSFTP和 PostHTTP(注意,名称不以“ Put”开头,因为这会引起混淆,因为在处理HTTP时,PUT和POST具有特殊含义)。
该处理器可以使用@OnScheduled注释的方法创建或初始化连接池 。但是,由于通信问题可能会阻止建立连接或导致连接终止,因此此时不会创建连接本身。而是在该onTrigger方法中从池中创建或租用连接。
该onTrigger方法首先通过该get方法从ProcessSession获得FlowFile 。如果没有FlowFile可用,该方法将返回而不获取与远程资源的连接。
如果至少有一个FlowFile可用,则处理器(如果可能)从连接池获取连接,否则创建新连接。如果处理器既无法从连接池中租借连接也无法创建新连接,则将FlowFile路由到failure,记录该事件,然后该方法返回。
如果获得连接,则处理器通过read在ProcessSession上调用方法并传递InputStreamCallback(通常是匿名内部类)来获取FlowFile内容 的InputStream,并从该回调中将FlowFile的内容传输到目的地。记录该事件以及传输文件所花费的时间以及传输文件的数据速率。通过从ProcessSession中通过getProvenanceReporter方法获取报告者并在报告者上调用该send方法,将SEND事件报告给ProvenanceReporter 。连接是返回还是添加到“连接池”中,具体取决于连接是从池中租用还是由onTrigger方法新创建 。
如果出现通信问题,则连接通常会终止并且不会返回(或添加)到连接池中。如果将数据发送到远程资源存在问题,则用于处理错误的所需方法取决于一些注意事项。如果问题与网络状况有关,则通常将FlowFile路由到failure。FlowFile不会受到惩罚,因为数据没有必要出现问题。与数据入口处理器不同,我们通常不调用yieldProcessContext。这是因为在摄取的情况下,在处理器能够执行其功能之前,FlowFile不存在。但是,在放置处理器的情况下,DataFlow Manager可以选择路由failure到另一个处理器。在一个系统出现问题的情况下,这可以允许使用“备份”系统,或者可以将其用于跨多个系统的负载分配。
如果发生与数据相关的问题,则应采用两种方法之一。首先,如果问题很可能会自行解决,则对FlowFile进行惩罚,然后将其路由到 failure。例如,对于PutFTP,就是这种情况,因为文件命名冲突而无法传输FlowFile。前提是该文件最终将从目录中删除,以便可以传输新文件。结果,我们对FlowFile进行了惩罚,并路由到, failure以便稍后再试。在另一种情况下,如果数据存在实际问题(例如数据不符合某些要求的规范),则可以采用其他方法。在这种情况下,将failure关系分解为afailure和a可能是有利的 communications failure关系。这使DataFlow Manager可以确定如何分别处理每种情况。在这些情况下,通过在创建关系时在“描述”中阐明这两个关系之间的差异,来很好地记录下来是很重要的。
断开与远程系统的连接,并使用注释的方法关闭连接池,@OnStopped以便可以回收资源。

3、Route Based On Cotent(One-to-One)(基于内容的路由(一对一))

根据其内容路由数据的处理器将采用以下两种形式之一:将传入的FlowFile恰好路由到一个目的地,或将传入的数据路由到0个或多个目的地。在这里,我们将讨论第一种情况。
该处理器具有两个关系:matched和unmatched。如果期望使用特定的数据格式,则处理器也将具有一种failure关系,当输入的输入格式不是期望的格式时。处理器公开指示路由标准的属性。
如果指定路由标准的属性需要处理(例如,编译正则表达式),则在@OnScheduled可能的情况下,使用注释方法来完成此处理。然后将结果存储在标记为的成员变量中volatile。
该onTrigger方法获得单个FlowFile。该方法通过ProcessSession的方法读取FlowFile的内容,read 并在数据流传输时评估匹配条件。然后,处理器根据条件是否匹配来确定是否应将FlowFile路由到matched或路由到unmatched该文件,并将FlowFile路由到适当的关系。
然后,处理器发出一个Provenance ROUTE事件,该事件指示处理器将FlowFile路由到哪个关系。
该处理程序带有 软件包中的@SideEffectFree和 @SupportsBatching注释org.apache.nifi.annotation.behavior。

4、Route Based on Content(One-to Many)(基于内容的路由(一对多))

如果处理器将单个FlowFile路由到潜在的许多关系,则该处理器将与上述用于基于内容路由数据的处理器略有不同。该处理器通常具有由用户动态定义的unmatched关系以及一个关系。
为了使用户能够额外定义属性,getSupportedDynamicPropertyDescriptor必须重写该方法。此方法返回具有提供的名称的PropertyDescriptor和适用的验证器,以确保用户指定的匹配条件有效。
在此处理器中,getRelationships方法返回的关系集 是标记为的成员变量volatile。此Set最初使用名为的单个Relationship构造unmatched。onPropertyModified重写该方法,以便在添加或删除属性时,将使用相同的名称创建一个新的Relationship。如果处理器具有非用户定义的属性,则检查指定的属性是否为用户定义很重要。这可以通过调用isDynamic传递给此方法的PropertyDescriptor的方法。如果此属性是动态的,则将创建新的一组关系,并将先前的一组关系复制到其中。此新Set要么添加了新创建的Relationship,要么从中删除了,这取决于是向处理器添加了新的Property还是删除了Property(通过检查此函数的第三个参数是否为,可以检测到删除属性null)。然后,将更新包含关系集的成员变量以指向该新集合。
如果指定路由标准的属性需要处理(例如,编译正则表达式),则在@OnScheduled可能的情况下,使用注释方法来完成此处理。然后将结果存储在标记为的成员变量中volatile。该成员变量通常是类型Map,其中键是类型Relationship,并且值的类型由处理属性值的结果定义。
该onTrigger方法通过getProcessSession的方法获得FlowFile 。如果没有FlowFile可用,它将立即返回。否则,将创建一个“关系类型”集。该方法通过ProcessSession的方法读取FlowFile的内容,并在read传输数据时评估每个匹配条件。对于任何匹配的标准,与该“匹配标准”相关联的关系将添加到“关系集”中。
读取FlowFile的内容后,该方法检查“关系集”是否为空。如果是这样,则原始FlowFile会添加一个属性,以指示将其路由到的关系并路由到unmatched。记录此消息,发出Provenance ROUTE事件,然后该方法返回。如果Set的大小等于1,则原始FlowFile会添加一个属性,以指示该路由文件已路由到的Relation,并被路由到Set中条目指定的Relationship。记录下来,为FlowFile发出一个Provenance ROUTE事件,然后该方法返回。
如果该集合包含多个关系,则处理器为每个关系(第一个关系除外)创建FlowFile的克隆。这是通过cloneProcessSession的方法完成的。无需报告“克隆源事件”,因为该框架将为您处理。原始FlowFile和每个克隆都被路由到其适当的Relationship,带有指明该关系名称的属性。为每个FlowFile发出一个Provenance ROUTE事件。记录下来,方法返回。
该处理程序带有 软件包中的@SideEffectFree和 @SupportsBatching注释 org.apache.nifi.annotation.behavior。

5、Route Streams Based on Content(One-to-Many)(根据内容路由流(一对多))

前面对基于内容的路由的描述(一对多)为创建功能非常强大的处理器提供了抽象。但是,它假定每个FlowFile都将全部路由到零个或多个关系。如果传入的数据格式是许多不同信息的“流”,并且我们希望将该流的不同部分发送到不同的关系,该怎么办?例如,假设我们想要一个RouteCSV处理器,以便它配置有多个正则表达式。如果CSV文件中的一行与正则表达式匹配,则该行应包含在与关联关系相关的出站FlowFile中。如果正则表达式与关系“有苹果”相关联,并且该正则表达式与FlowFile中的1000行匹配,对于“ has-apples”关系,应该有一个出站FlowFile,其中有1000行。如果将不同的正则表达式与“ has-oranges”关系关联,并且该正则表达式与FlowFile中的50行匹配,则“ has-oranges”关系中应该有一个出站FlowFile,其中包含50行。即,一个FlowFile进来,两个FlowFiles出来。这两个FlowFiles可能包含与原始FlowFile相同的某些文本行,或者它们可能完全不同。这是我们将在本节中讨论的处理器类型。对于“ has-oranges”关系,应该有一个出站FlowFile,其中有50行。即,一个FlowFile进来,两个FlowFiles出来。这两个FlowFiles可能包含与原始FlowFile相同的某些文本行,或者它们可能完全不同。这是我们将在本节中讨论的处理器类型。对于“ has-oranges”关系,应该有一个出站FlowFile,其中有50行。即,一个FlowFile进来,两个FlowFiles出来。这两个FlowFiles可能包含与原始FlowFile相同的某些文本行,或者它们可能完全不同。这是我们将在本节中讨论的处理器类型。
该处理器的名称以“ Route”开头,并以其路由的数据类型的名称结尾。在此处的示例中,我们正在路由CSV数据,因此处理器被命名为RouteCSV。该处理器支持动态属性。每个用户定义的属性都有一个名称,该名称映射到关系的名称。属性的值采用“匹配条件”所必需的格式。在我们的示例中,该属性的值必须是有效的正则表达式。
该处理器维护一个内部ConcurrentMap,其中键是a Relationship,值的类型取决于匹配条件的格式。在我们的示例中,我们将维持 ConcurrentMap。该处理器重写该 onPropertyModified方法。如果提供给此方法的新值(第三个参数)为null,则将从ConcurrentMap中删除其名称由属性名称(第一个参数)定义的Relationship。否则,将处理新值(在我们的示例中,通过调用Pattern.compile(newValue)),并将此值添加到ConcurrentMap,并且键再次为Relationship,其关系由属性名称指定。
该处理器将覆盖该customValidate方法。在此方法中,它将从中检索所有属性,ValidationContext并计算动态的PropertyDescriptor的数量(通过调用isDynamic() PropertyDescriptor)。如果动态PropertyDescriptors的数量为0,则表明用户尚未添加任何关系,因此处理器返回 ValidationResult指示,表明处理器无效,因为未添加任何关系。
当getRelationships调用处理器的方法时,处理器将返回用户指定的所有关系,并且还将返回一个unmatched关系。因为此处理器将必须读写内容存储库(这可能相对昂贵),所以如果希望该处理器用于非常大的数据量,则添加允许用户指定是否指定属性的属性可能是有利的他们是否关心与任何“匹配条件”都不匹配的数据。
onTrigger调用该方法时,处理器将通过获取FlowFile ProcessSession.get。如果没有可用数据,则处理器返回。否则,处理器将创建一个Map。我们将此地图称为flowFileMap。处理器通过调用读取传入的FlowFileProcessSession.read 并提供一个InputStreamCallback。在回调中,处理器从FlowFile中读取第一条数据。然后,处理器针对这一数据评估每个匹配标准。如果特定条件(在我们的示例中为正则表达式)匹配,则处理器从属于flowFileMap适当关系的那个位置获取FlowFile 。如果在Map中没有针对该关系的FlowFile,则处理器通过调用创建新的FlowFile session.create(incomingFlowFile),然后将新的FlowFile添加到flowFileMap。然后,处理器通过调用写入这一块数据到FlowFile的session.append 用OutputStreamCallback。在此OutputStreamCallback中,我们可以访问新的FlowFile的OutputStream,因此我们可以将数据写入新的FlowFile。然后,我们从OutputStreamCallback返回。遍历每个匹配条件后,如果没有一个匹配,我们将为unmatched关系执行与上述相同的例程 (除非用户将我们配置为不写出不匹配的数据)。现在我们已经调用了session.append,我们有了一个新版本的FlowFile。结果,我们需要更新flowFileMap以将Relationship与新的FlowFile关联。
如果在任何时候抛出异常,我们将需要将传入的FlowFile路由到failure。我们还将需要删除每个新创建的FlowFile,因为我们不会将它们传输到任何地方。我们可以通过致电来完成此操作session.remove(flowFileMap.values())。此时,我们将记录错误并返回。
否则,如果一切成功,我们现在可以遍历 flowFileMap并将每个FlowFile转移到相应的Relationship。然后,将原始FlowFile删除或路由到某个original关系。对于每个新创建的FlowFiles,我们还发出一个Provenance ROUTE事件,指示FlowFile转到哪个关系。在ROUTE事件的细节中包括此FlowFile中包含多少信息也是有帮助的。这使DataFlow Manager在查看“源谱系”视图时可以轻松查看给定输入FlowFile的每个关系有多少条信息。
此外,某些处理器可能需要对发送到每个关系的数据进行“分组”,以使发送到关系的每个FlowFile具有相同的值。在我们的示例中,我们可能希望允许正则表达式具有一个捕获组,并且如果CSV中的两个不同行与正则表达式匹配,但捕获组具有不同的值,我们希望将它们添加到两个不同的FlowFiles中。然后可以将匹配值作为属性添加到每个FlowFile。这可以通过修改来实现flowFileMap,使得其被定义为Map>其中T是分组功能的类型(在我们的例子中,该集团将是一个String,因为它是评价一个正则表达式的捕获组的结果)。

6、Route Based on Attributes(基于属性的路线)

该处理器几乎与上述基于内容处理器的路由数据相同。它采用两种不同的形式:一对一和一对多,以及基于内容的路由处理器。但是,此处理器不对ProcessSession的read 方法进行任何调用,因为它不读取FlowFile内容。该处理器通常非常快,因此@SupportsBatching在这种情况下,注释可能非常重要。

7、Split Content(Ont-toMany)(分割内容(一对多))

该处理器通常不需要用户配置,除了要创建的每个拆分的大小。该onTrigger方法从其输入队列中获取FlowFile。将创建一个FlowFile类型的列表。原始FlowFile通过ProcessSession的read方法读取,并使用InputStreamCallback。在InputStreamCallback中,将读取内容,直到达到分割FlowFile的位置为止。如果不需要拆分,则回调返回,并将原始FlowFile路由到success。在这种情况下,将发出Provenance ROUTE事件。通常,将FlowFile路由到时不会发出ROUTE事件success因为这会产生非常冗长的血统,因此很难导航。但是,在这种情况下,该事件很有用,因为否则我们会期望发生FORK事件,而缺少任何事件都可能引起混乱。将记录FlowFile未被拆分而是被转移到的事实success,然后该方法返回。
如果到达需要拆分FlowFile的位置,则通过ProcessSession的create(FlowFile)方法或 clone(FlowFile, long, long)方法创建一个新的FlowFile 。代码的下一部分取决于create使用clone方法还是使用方法。两种方法都在下面介绍。哪种解决方案合适,必须根据具体情况确定。
当数据不会直接从原始FlowFile复制到新FlowFile时,最适合使用Create方法。例如,如果将仅复制一些数据,或者如果将数据复制到新的FlowFile之前以某种方式进行了修改,则此方法是必需的。但是,如果新FlowFile的内容是原始FlowFile一部分的精确副本,则首选克隆方法。
创建方法 如果使用该create方法,则以原始FlowFile作为参数来调用该方法,以便新创建的FlowFile将继承原始FlowFile的属性,并且框架将创建Provenance FORK事件。
然后代码进入一个try/finally块。在该finally 块内,新创建的FlowFile被添加到已创建的FlowFiles列表中。这是在一个finally块中完成的,因此,如果引发异常,则将适当清理新创建的FlowFile。在该try块内,回调通过write使用OutputStreamCallback调用ProcessSession的方法来启动新的回调。然后,将适当的数据从原始FlowFile的InputStream复制到新FlowFile的OutputStream。
克隆方法 如果新创建的FlowFile的内容只是原始FlowFile字节的连续子集,则最好使用该clone(FlowFile, long, long)方法而不是create(FlowFile)ProcessSession的 方法。在这种情况下,新FlowFile内容应从其开始的原始FlowFile的偏移量作为该clone方法的第二个参数传递。新FlowFile的长度作为第三个参数传递给该clone 方法。例如,如果原始FlowFile为10,000字节,而我们调用clone(flowFile, 500, 100),则将返回给我们的FlowFile将与flowFile就其属性而言。但是,新创建的FlowFile的内容长度为100个字节,并且将从原始FlowFile的偏移量500开始。也就是说,新创建的FlowFile的内容与复制原始FlowFile的字节500到599相同。
创建克隆后,会将其添加到FlowFiles列表中。
如果适用,此方法比Create方法更受青睐,因为不需要磁盘I / O。该框架能够简单地创建一个引用原始FlowFile内容的子集的新FlowFile,而无需实际复制数据。但是,这并不总是可能的。例如,如果标头信息必须从原始FlowFile的开头复制并添加到每个Split的开头,则此方法是不可能的。
两种方法 不管使用克隆方法还是创建方法,以下内容均适用:
如果在InputStreamCallback的任何点达到了无法继续处理的条件(例如,输入格式错误),ProcessException则应抛出a。对ProcessSessionread方法的调用被包装在一个被捕获的try/catch块中ProcessException。如果捕获到异常,则会生成一条日志消息,说明错误。通过ProcessSession的remove 方法删除新创建的FlowFiles列表。原始FlowFile被路由到failure。
如果没有问题,则将原始FlowFile路由到,original 并将所有新创建的FlowFile更新为包括以下属性:

属性名称 描述
split.parent.uuid 原始FlowFile的UUID
split.index 一个单向数字,指示这是列表中的哪个FlowFile(创建的第一个FlowFile将具有一个值0,第二个FlowFile将具有一个值,依此类推1)
split.count 已创建的拆分FlowFiles的总数

新创建的FlowFiles被路由到success; 记录此事件;然后该方法返回。

8、Update Attributes Based on Content(根据内容更新属性)

该处理器与上面讨论的基于内容处理器的路由非常相似。而不是路由到FlowFilematched或unmatched,所述FlowFile通常路由到success或failure 和属性被添加到FlowFile适当。以类似于基于内容的路由(一对多)的方式配置要添加的属性,用户可以定义自己的属性。属性的名称指示要添加的属性的名称。该属性的值指示要应用于数据的某些匹配条件。如果匹配条件与数据匹配,则会添加一个属性名称与该属性相同的属性。属性的值是来自匹配内容的条件。
例如,评估XPath表达式的处理器可以允许输入用户定义的XPath。如果XPath与FlowFile的内容匹配,则该FlowFile将添加一个属性,该属性的名称等于该属性名称的名称,并且其值等于与该XPath匹配的XML元素或属性的文本内容。failure如果在此示例中传入的FlowFile不是有效的XML,则将使用该关系。success无论是否找到任何匹配项,都将使用该关系。然后可以在适当的时候使用它来路由FlowFile。
该处理器发出类型为ATTRIBUTES_MODIFIED的出处事件。

9、Enrich/Modify Content(丰富/修改内容)

“Enrich/Modify”模式非常普遍且通用。此模式负责任何常规的内容修改。在大多数情况下,此处理器用@SideEffectFree和@SupportsBatching注释标记 。处理器具有任意数量的必需和可选属性,具体取决于处理器的功能。处理器通常具有success和failure关系。failure当输入文件不是预期格式时,通常使用该关系。
该处理器获取FlowFile并使用ProcessSession的write(StreamCallback)方法对其进行更新,以便它既可以读取FlowFile的内容,又可以写入FlowFile内容的下一个版本。如果在回调过程中遇到错误,则回调将抛出ProcessException。对ProcessSessionwrite方法的调用被包装在一个try/catch块中,该 块捕获ProcessException 并路由FlowFile失败。
如果回调成功,则将发出CONTENT_MODIFIED来源事件。

七、Error Handling

编写处理器时,可能会发生几种不同的意外情况。如果处理器自身不处理错误,那么处理器开发人员必须了解NiFi框架如何运作的机制,这一点很重要,了解处理器期望进行何种错误处理也很重要。在这里,我们将讨论处理器在工作过程中应如何处理意外错误。

1、Exceptions within the Processor(处理器内的异常)

在执行onTrigger处理器的方法期间,许多事情可能会出错。常见的故障情况包括:

  • 传入的数据不是预期的格式。
  • 与外部服务的网络连接失败。
  • 向磁盘读取或写入数据失败。
  • 处理器或从属库中有错误。

这些条件中的任何一个都可能导致从处理器引发异常。从框架的角度来看,有两种可以逃避Processor的异常:ProcessException和所有其他异常。
如果从Processor抛出ProcessException,则框架将假定这是已知结果的失败。而且,这是以后尝试再次处理数据的成功条件。结果,该框架将回滚正在处理的会话,并惩罚正在处理的FlowFiles。
但是,如果有任何其他异常逃离了Processor,则框架将假定开发人员未将其视为失败。在这种情况下,框架还将回滚会话并惩罚FlowFiles。但是,在这种情况下,我们可能会遇到一些非常麻烦的情况。例如,处理器可能处于故障状态,并且可能会连续运行,从而耗尽系统资源,而没有提供任何有用的工作。例如,当连续抛出NullPointerException时,这是相当普遍的。为了避免这种情况,如果ProcessException以外的其他异常能够转义Processor的 onTrigger方法中,框架还将“管理上获得”处理器。这意味着在一段时间内不会触发处理器再次运行。时间是在nifi.properties文件中配置的,但是默认情况下是10秒。

2、Exceptions within a callback:IOException,RuntimeException(回调函数的异常)

更通常情况下,当一个异常在处理器中发生时,它从一个回调内发生(即, InputStreamCallback,OutputStreamCallback,或StreamCallback)。也就是说,在处理FlowFile的内容期间。允许回调抛出RuntimeException或IOException。在RuntimeException的情况下,此Exception将传播回该onTrigger方法。如果是 IOException,则将异常包装在ProcessException中,然后从框架中抛出该ProcessException。
出于这个原因,建议使用回调的处理器在一个try/catch块内这样做,并捕获它们希望其回调抛出的ProcessException任何其他异常RuntimeException。这是不建议处理器赶上一般Exception或Throwable个案,但是。不推荐这样做有两个原因。
首先,如果引发了意外的RuntimeException,则可能是一个错误,允许框架回滚会话将确保没有数据丢失,并确保DataFlow Managers可以通过将数据排入队列来处理他们认为合适的数据。地点。
其次,从回调引发IOException时,实际上有两种类型的IOException:从处理器代码引发的IOException(例如,数据不是预期的格式或网络连接失败),以及从IOException引发的异常。内容存储库(存储FlowFile内容的位置)。如果是后者,则框架将捕获此IOException并将其包装到一个FlowFileAccessExceptionExtended中RuntimeException。这是显式完成的,以便Exception将转义该onTrigger方法,并且框架可以适当地处理此条件。捕获一般Exception可以防止这种情况的发生。

3、Penalization VS.Yielding(惩罚与屈服)

在处理过程中发生问题时,该框架将公开两种方法,以允许处理器开发人员避免执行不必要的工作:“惩罚”和“屈服”。对于刚接触NiFi API的开发人员,这两个概念可能会造成混淆。开发人员可以通过调用FlowFile来惩罚FlowFilepenalize(FlowFile)ProcessSession的方法。这导致FlowFile本身在一段时间内无法被下游处理器访问。FlowFile不可访问的时间由DataFlow Manager通过在“处理器配置”对话框中设置“惩罚持续时间”设置来确定。默认值为30秒。通常,当处理器确定由于预期会自行解决的环境原因而无法处理数据时,便会执行此操作。一个很好的例子是PutSFTP处理器,如果SFTP服务器上已经存在一个具有相同文件名的文件,它将惩罚FlowFile。在这种情况下,处理器会惩罚FlowFile并将其路由到失败。然后,DataFlow Manager可以将故障路由回相同的PutSFTP处理器。这样,如果文件存在相同的文件名,处理器将在30秒内(或DFM已配置处理器使用的任何时间)不尝试再次发送文件。同时,它可以继续处理其他FlowFiles。
另一方面,屈服允许处理器开发人员向框架指示在一段时间内它将无法执行任何有用的功能。这通常发生在与远程资源进行通信的处理器上。如果处理器无法连接到远程资源,或者期望远程资源提供数据但报告它没有数据,则处理器应调用yield该ProcessContext对象,然后返回。这样,处理器就告诉框架它不应浪费资源来触发该处理器运行,因为它无能为力-最好使用这些资源来允许其他处理器运行。

4、Session Rollback(会话回滚)

到目前为止,当我们讨论时ProcessSession,我们通常将其简称为访问FlowFiles的机制。但是,它提供了另一个非常重要的功能,即事务性。在ProcessSession上调用的所有方法均作为事务发生。当我们决定结束交易时,可以通过调用 commit()或通过来完成rollback()。通常,这是由AbstractProcessor类处理的:如果onTrigger方法抛出Exception,则AbstractProcessor将捕获Exception,调用session.rollback(),然后重新抛出Exception。否则,AbstractProcessor将调用commit()ProcessSession。
但是,有时开发人员会希望显式回滚会话。可以随时通过调用rollback()orrollback(boolean)方法来完成此操作。如果使用后者,则布尔值指示从队列(通过ProcessSessionget方法)中拉出的那些FlowFiles在添加回队列之前是否应该受到惩罚。
当rollback被调用时,将已发生的该会话的FlowFiles任何修改被丢弃,这包括内容和属性的修改。另外,所有的种源的事件被回滚(与通过使值发射的任何SEND事件之外true的force参数)。然后将从输入队列中拉出的FlowFiles传输回输入队列(并可以选择惩罚),以便可以再次处理它们。
另一方面,当commit调用该方法时,FlowFile的新状态将保留在FlowFile存储库中,发生的所有Provenance事件都将保留在Provenance存储库中。先前的内容被销毁(除非另一个FlowFile引用相同的内容),并且FlowFiles被传输到出站队列,以便下一个处理器可以对数据进行操作。
注意使用org.apache.nifi.annotation.behavior.SupportsBatching 注释如何影响此行为也很重要。如果处理器使用此注释,则对的调用ProcessSession.commit可能不会立即生效。而是,可以将这些提交批处理在一起以提供更高的吞吐量。但是,如果处理器在任何时候回滚ProcessSession,则自上次调用以来的所有更改commit都将被丢弃,并且所有“批处理”提交都将生效。这些“批处理”的提交不会回滚。

八、General Design Considerations(一般设计注意事项)

设计处理器时,请牢记一些重要的设计。《开发人员指南》的这一部分将开发人员在创建处理器时应考虑的一些想法置于最前沿。

1、Consider the User(考虑用户)

开发处理器(或任何其他组件)时要记住的最重要的概念之一就是要创建的用户体验。重要的是要记住,作为此类组件的开发者,您可能具有其他人所没有的上下文的重要知识。应始终提供文档,以便那些不熟悉该过程的人能够轻松使用它。
考虑用户体验时,还必须注意一致性非常重要。最好遵循标准的命名约定。对于处理器名称,属性名称和值,关系名称以及用户将遇到的任何其他方面,都是如此。
简单至关重要!避免添加您不希望用户理解或更改的属性。作为开发人员,我们被告知硬编码值是不好的。但这有时会使开发人员暴露一些属性,这些属性在被要求澄清时告诉用户只保留默认值。这导致混乱和复杂。

2、Cohesion and Reusability(凝聚力和可重用性)

为了制作一个单一的,有凝聚力的单元,有时会吸引开发人员将多个功能组合到一个处理器中。当处理器希望输入数据的格式为X以便处理器可以将数据转换为格式Y并将新格式化的数据发送到某些外部服务时,情况就是如此。
采用这种格式来格式化特定端点的数据,然后将数据发送到同一处理器中的该端点有几个缺点:

  • 处理器变得非常复杂,因为它必须执行数据转换任务以及将数据发送到远程服务的任务。
  • 如果处理器无法与远程服务通信,它将把数据路由到failure关系。在这种情况下,处理器将负责再次执行数据转换。如果再次失败,则翻译将再次完成。
  • 如果我们有五个不同的处理器在发送数据之前将传入的数据转换为这种新格式,那么我们将有很多重复的代码。例如,如果架构更改,则必须更新许多处理器。
  • 当处理器完成向远程服务的发送后,这些中间数据将被丢弃。中间数据格式可能对其他处理器很有用。

为了避免这些问题,并使处理器具有更高的可重用性,处理器应始终坚持“做一件事情,做好事”的原则。此类处理器应分为两个单独的处理器:一个用于将数据从格式X转换为格式Y,另一个用于将数据发送到远程资源。

3、Naming Conventions(命名约定)

为了向用户提供一致的外观,建议处理器遵循标准的命名约定。以下是使用的标准约定的列表:

  • 从远程系统提取数据的处理器称为Get 或Get ,具体取决于它们是通过已知协议(例如GetHTTP或GetFTP)从任意来源轮询数据,还是从已知服务中提取数据(例如GetKafka)
  • 将数据推送到远程系统的处理器称为Put 或Put
  • 关系名称是小写字母,并使用空格来描述单词。
  • 财产名称将重要的单词大写,就像书名一样。

    4、Processor Behavior Annotations(处理器行为注释)

    创建处理器时,开发人员能够向框架提供有关如何最有效地利用处理器的提示。这是通过将注释应用于Processor的类来完成的。可以应用于处理器的注释存在于的三个子包中org.apache.nifi.annotation。那些在documentation子包被用于提供文档给用户。那些在lifecycle子包指导框架,方法应该在处理器中调用,以响应到相应的生命周期事件。那些在behavior包装帮助框架了解如何与处理器的调度和一般行为方面的互动。
    org.apache.nifi.annotation.behavior软件包中的以下注释可用于修改框架处理处理器的方式:

  • EventDriven:指示框架可以使用事件驱动的调度策略来调度处理器。此策略目前仍处于试验阶段,但会导致无法处理极高数据速率的数据流的资源利用率降低。

  • SideEffectFree:表示处理器在NiFi外部没有任何副作用。结果,框架可以自由地使用相同的输入多次调用处理器,而不会引起任何意外的结果。这意味着幂等行为。框架可通过执行操作(例如将ProcessSession从一个处理器转移到另一个处理器)来提高效率,这样,如果发生问题,许多处理器的操作可以回滚并再次执行。
  • SupportsBatching:此注释表明框架可以将多个ProcessSession提交批处理为一个提交。如果存在此注释,则用户将能够在“处理器的调度”选项卡中选择是偏爱高吞吐量还是较低的延迟。该注释应适用于大多数处理器,但有一个警告:如果处理器调用ProcessSession.commit,则不能保证数据已安全地存储在NiFi的Content,FlowFile和Provenance存储库中。结果,对于那些从外部源接收数据,提交会话,然后删除远程数据或确认与远程资源进行事务的处理器来说,这是不合适的。
  • TriggerSerially:当存在此批注时,框架将不允许用户调度多个并发线程来一次执行该onTrigger方法。相反,线程数(“并行任务”)将始终设置为1。这并没有,但是,意味着该处理器不必是线程安全的,因为这是执行的线程onTrigger调用之间可能发生改变。
  • PrimaryNodeOnly:群集时,Apache NiFi为处理器提供两种执行模式:“主节点”和“所有节点”。尽管在所有节点上运行可提供更好的并行性,但已知某些处理器在多个节点上运行时会导致意外行为。例如,某些处理器列出或从远程文件系统读取文件。如果此类处理器计划在“所有节点”上运行,则将导致不必要的重复甚至错误。此类处理器应使用此注释。应用此注释将限制处理器仅在“主节点”上运行。
  • TriggerWhenAnyDestinationAvailable:默认情况下,如果任何出站队列已满,NiFi不会安排处理器运行。这允许在处理器链的整个过程中施加背压。但是,即使出站队列之一已满,某些处理器也可能需要运行。此注释指示如果任何关系“可用”,则处理器应运行。如果没有使用该关系的连接都已满,则该关系被称为“可用”。例如,DistributeLoad Processor使用此注释。如果使用“循环”调度策略,则如果任何出站队列已满,则处理器将不会运行。但是,如果使用“下一个可用”调度策略,
  • TriggerWhenEmpty:默认行为是:仅当处理器的输入队列中至少有一个FlowFile或处理器没有输入队列(通常是“源”处理器)时,才触发处理器运行。应用此批注将导致框架忽略输入队列的大小,并触发处理器,而不管输入队列上是否有任何数据。例如,这在需要触发处理器以使其定期运行以使网络连接超时时很有用。
  • InputRequirement:默认情况下,所有处理器都将允许用户为处理器创建传入连接,但是如果用户未创建传入连接,则该处理器仍然有效并且可以安排运行。但是,对于预期被用作“源处理器”的处理器,这可能会使用户感到困惑,并且用户可能会尝试将FlowFiles发送到该处理器,仅使FlowFiles排队而不进行处理。相反,如果处理器期望传入的FlowFiles但没有输入队列,则处理器将被安排为运行但不执行任何工作,因为它将不会收到FlowFile,这也会导致混乱。因此,我们可以使用@InputRequirement注解,并为其提供一个值INPUT_REQUIRED,INPUT_ALLOWED或INPUT_FORBIDDEN。这为框架提供了有关何时应该使处理器无效或用户是否应该能够与处理器建立连接的信息。例如,如果一个处理器带有注释InputRequirement(Requirement.INPUT_FORBIDDEN),则用户甚至无法创建以该处理器为目标的连接。

    5、Data Buffering(数据缓冲)

    要记住的重要一点是NiFi提供了通用的数据处理功能。数据可以是任何格式。通常为处理器安排几个线程。NiFi新手开发人员经常犯的一个错误是将FlowFile的所有内容缓冲在内存中。尽管在某些情况下需要这样做,但除非有众所周知的数据格式,否则应尽可能避免。例如,负责对XML文档执行XPath的处理程序将需要加载整个数据内容存入内存。这通常是可以接受的,因为XML预计不会非常大。但是,搜索特定字节序列的处理器可用于搜索数百GB或更大的文件。
    与其将这些数据缓冲到内存中,不如在从内容存储库中流式传输数据时对它进行评估(例如,通过扫描InputStream提供给回调的内容ProcessSession.read)。当然,在这种情况下,我们不想从Content Repository中读取每个字节,因此我们将使用BufferedInputStream或以某种方式缓冲少量数据(如果适用)。

    九、Controller Services(控制器服务)

    该ControllerService接口允许开发人员以干净一致的方式在JVM之间共享功能和状态。该接口类似于该接口的Processor 接口,但是没有onTrigger方法,因为没有计划将Controller Services定期运行,并且Controller Services没有关系,因为它们没有直接集成到流中。而是,它们由处理器,报告任务和其他控制器服务使用。

    1、Developing a ControllerService(开发ControllerService)

    就像Processor接口一样,ControllerService接口公开用于配置,验证和初始化的方法。所有这些方法都与Processor接口的initialize方法相同,只是方法传递了a ControllerServiceInitializationContext而不是a ProcessorInitializationContext。
    控制器服务带有处理器没有的其他约束。控制器服务必须包含可扩展的接口ControllerService。然后只能通过实现接口与实现进行交互。例如,永远不会为Processor提供ControllerService的具体实现,因此必须仅通过extends接口引用该服务ControllerService。
    之所以存在此限制,主要是因为处理器可以存在于一个NiFi存档(NAR)中,而处理器所处的Controller Service的实现可以存在于另一个NAR中。这是由框架通过以如下方式动态实现公开的接口来实现的:框架可以切换到适当的ClassLoader并在具体实现上调用所需的方法。但是,为了使此工作有效,处理器和Controller Service实现必须共享Controller Service接口的相同定义。因此,这两个NAR必须依赖于容纳Controller Service接口的NAR。有关更多信息,请参见NiFi档案(NAR)

    2、Interaction with a ControllerService(与ControllerService交互)

    可以通过ProcessorService,另一个ControllerService或ReportingTask通过ControllerServiceLookup或使用identifiesControllerServicePropertyDescriptor的Builder类的方法来获取ControllerServices 。处理器可以通过传递给该initialize 方法的ProcessorInitializationContext获取ControllerServiceLookup 。同样,它由ControllerService从ControllerServiceInitializationContext获得,并由ReportingTask通过传递给该initialize方法的ReportingConfiguration对象获得。
    但是,对于大多数用例而言,identifiesControllerService 首选使用PropertyDescriptor Builder的方法,它是最简单的方法。为了使用此方法,我们创建了一个PropertyDescriptor,它如下引用控制器服务:

    1. public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
    2. .name("SSL Context Service")
    3. .description("Specified the SSL Context Service that can be used to create secure connections")
    4. .required(true)
    5. .identifiesControllerService(SSLContextService.class)
    6. .build();

    使用此方法,将提示用户提供应使用的SSL上下文服务。这是通过为用户提供一个下拉菜单来实现的,无论执行何种实现,他们都可以从中选择已配置的任何SSLContextService配置。
    为了使用此服务,处理器可以使用以下代码:

    1. final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE)
    2. .asControllerService(SSLContextService.class);

    注意:这SSLContextService是扩展ControllerService的接口。目前唯一的实现是StandardSSLContextService。但是,处理器开发人员不必担心此细节。

    十、Reporting Tasks(报告任务)

    到目前为止,我们几乎没有提到如何将NiFi及其组件的性能传达给外界。系统是否能够跟上传入数据速率?系统还能处理多少?在一天的高峰时间与一天中最不繁忙的时间相比,要处理多少数据?
    为了回答这些问题,NiFi提供了一种通过ReportingTask 界面向外部服务报告状态,统计信息,指标和监视信息的功能。向ReportingTasks授予访问大量信息的权限,以确定系统的运行方式。

    1、Developing a Reporting Taks(制定报告任务)

    就像Processor和ControllerService接口一样,ReportingTask接口公开用于配置,验证和初始化的方法。这些方法均与Processor和ControllerService接口的initialize方法相同,不同之处在于,该 方法传递了一个ReportingConfiguration 对象,这与其他组件接收的初始化对象相反。ReportingTask还具有一种onTrigger由框架调用的方法,以触发任务执行其工作。
    在该onTrigger方法中,ReportingTask被授予对ReportingContext的访问权限,从中可以获取有关NiFi实例的配置和信息。BulletinRepository允许查询公告,并允许ReportingTask提交自己的公告,以便将信息呈现给用户。可通过上下文访问的ControllerServiceLookup提供对已配置的ControllerServices的访问。但是,这种获取Controller Services的方法不是首选方法。而是,获取控制器服务的首选方法是在PropertyDescriptor中引用控制器服务,如在与ControllerService交互中所讨论的。
    EventAccess通过ReportingContext公开的对象提供对的访问ProcessGroupStatus,后者公开了有关进程组,处理器,连接和其他组件在过去五分钟内处理的数据量的统计信息。此外,EventAccess对象还提供对已存储在中的ProvenanceEventRecords的访问ProvenanceEventRepository。当从外部来源接收数据,向外部服务发送数据,从系统中删除数据,根据做出的某些决定对其进行修改或路由时,处理器将发出这些来源事件。
    每个ProvenanceEvent都具有FlowFile的ID,事件的类型,事件的创建时间以及在组件访问FlowFile时与FlowFile关联的所有FlowFile属性以及与之关联的FlowFile属性。作为事件描述的处理结果的FlowFile。这为ReportingTasks提供了大量信息,从而允许以多种不同的方式生成报告,以暴露各种操作问题所需的指标和监视功能。

    十一、UI Extensions(UI扩展)

    NiFi提供了两个UI扩展点:

  • 自定义处理器用户界面

  • 内容查看者

可以创建自定义UI,以提供大多数处理器设置中可用的标准属性/值表以外的配置选项。具有自定义UI的处理器的示例是UpdateAttributeJoltTransformJSON

  • 可以创建内容查看器以扩展可在NiFi中查看的数据类型。NiFi在lib目录中随附NAR,其中包含内容查看器,用于数据类型(例如csv,xml,avro,json(标准nar))和图像类型(例如png,jpeg和gif(media-nar))。

    1、Custom Processor UIs(自定义处理器用户界面)

    要将自定义UI添加到处理器:

  • 创建您的UI。

  • 在处理器NAR中构建并捆绑WAR。
  • WAR需要nifi-processor-configuration在META-INF目录中包含一个文件,该文件将Custom UI与该处理器相关联。
  • 将NAR放在lib目录中,NiFi启动时会发现它。
  • 现在,在处理器的“配置处理器”窗口中,“属性”选项卡应具有一个Advanced按钮,该按钮将访问“自定义UI”。

例如,这是UpdateAttribute的NAR布局:
更新属性NAR布局

nifi-update-attribute-bundle │ ├── nifi-update-attribute-model │ ├── nifi-update-attribute-nar │ ├── nifi-update-attribute-processor │ ├── nifi-update-attribute-ui │ ├── pom.xml │ └── src │ └── main │ ├── java │ ├── resources │ └── webapp │ └── css │ └── images │ └── js │ └── META-INF │ │ └── nifi-processor-configuration │ └── WEB-INF │ └── pom.xml

其内容nifi-processor-configuration如下:

  1. org.apache.nifi.processors.attributes.UpdateAttribute:${project.groupId}:nifi-update-attribute-nar:${project.version}

还可以为Controller Services和Reporting Tasks实现自定义UI

2、Content Viewers(内容查看者)

要添加内容查看器:

  • 在处理器NAR中构建并捆绑WAR。
  • WAR需要nifi-content-viewer在META-INF目录中包含一个文件,该文件列出了受支持的内容类型。
  • 将NAR放在lib目录中,NiFi启动时会发现它。
  • 遇到匹配的内容类型时,内容查看器将生成适当的视图。

一个很好的例子是标准内容查看器的NAR布局:

nifi-standard-bundle │ ├── nifi-jolt-transform-json-ui │ ├── nifi-standard-content-viewer │ ├── pom.xml │ └── src │ └── main │ ├── java │ ├── resources │ └── webapp │ └── css │ └── META-INF │ │ └── nifi-content-viewer │ └── WEB-INF │ ├── nifi-standard-nar │ ├── nifi-standard-prioritizers │ ├── nifi-standard-processors │ ├── nifi-standard-reporting-tasks │ ├── nifi-standard-utils │ └── pom.xml

其内容nifi-content-viewer如下:

application/xml application/json text/plain text/csv avro/binary application/avro-binary application/avro+binary

十二、Command Line Tools(命令行工具)

1、tls-toolkit(tls工具包)

客户端/服务器操作模式来自于自动生成所需的TLS配置工件而不需要在集中位置执行该生成的需求。这简化了集群环境中的配置。由于我们不一定具有运行生成逻辑或受信任的证书颁发机构的中心位置,因此使用共享机密对客户端和服务器进行身份验证。
tls-toolkit通过对CA服务器和客户端发送的CSR的公钥进行HMAC验证来防止中间人攻击。共享密钥(令牌)用作HMAC密钥。
基本过程如下:

  • 客户端生成一个密钥对。
  • 客户端生成一个请求json负载,其中包含一个CSR和一个HMAC,其中令牌作为密钥,而CSR的公共密钥指纹作为数据。
  • 客户端通过指定的https端口连接到CA主机名,并验证CA证书的CN与主机名是否匹配(注意:由于我们目前不信任CA,因此不会增加安全性,这只是一种方法如果可能,请尽早排除错误)。
  • 服务器使用令牌作为密钥并使用CSR的公共密钥指纹作为数据从客户端有效负载验证HMAC。这证明客户端知道共享的机密,并且它希望使用该公钥对CSR进行签名。(注意:中间的人可以继续这样做,但是如果不使HMAC无效,就无法更改CSR,从而无法达到目的)。
  • 服务器对CSR进行签名,并发送回响应json负载,其中包含证书和HMAC,其中令牌作为密钥,而其公钥的指纹作为数据。
  • 客户端使用令牌作为密钥和TLS会话提供的证书公钥的指纹来验证响应HMAC。这验证了知道共享机密的CA是我们正在通过TLS与之交谈的CA。
  • 客户端验证来自TLS会话的CA证书是否已在有效负载中对证书进行了签名。
  • 客户端使用证书链将生成的KeyPair添加到其密钥库,并将TLS连接中的CA证书添加到其信任库。
  • 客户端写出包含密钥库,信任库密码和有关交换的其他详细信息的配置json。

    十三、Testing(测验)

    测试将在较大框架中使用的组件通常非常麻烦且棘手。借助NiFi,我们努力使测试组件尽可能简单。为此,我们创建了一个nifi-mock模块,可以将其与JUnit结合使用,以提供对组件的广泛测试。
    Mock框架主要针对测试处理器,因为它们是迄今为止最常用的扩展点。但是,该框架的确提供了测试Controller Services的能力。
    通常通过创建功能测试来验证组件的行为来测试组件。这样做是因为处理器通常由少数辅助方法组成,但是逻辑将主要包含在该onTrigger方法中。该TestRunner接口允许我们通过将更多“原始”对象(例如文件和字节数组)转换为FlowFiles来测试处理器和控制器服务,并处理创建处理器完成其工作所需的ProcessSessions和ProcessContext,以及在其中调用必要的生命周期方法。为了确保处理器在单元测试中的行为与生产中的行为相同。

    1、Instantiate TestRunner(实例化TestRunner)

    对于处理器或控制器服务的大多数单元测试都是从创建TestRunner 类的实例开始的。为了向处理器添加必要的类,可以使用Maven依赖项:

    1. <dependency>
    2. <groupId>org.apache.nifi</groupId>
    3. <artifactId>nifi-mock</artifactId>
    4. <version>${nifi version}</version>
    5. </dependency>

    我们TestRunner通过调用类的静态newTestRunner方法之一TestRunners(位于org.apache.nifi.util包中)来创建一个新方法。这些方法接受被测处理器的参数(可以是要测试的处理器的类,也可以是处理器的实例),并且还允许设置处理器名称。

    2、Add ControllerServices(添加ControllerServices)

    创建新的测试运行器后,我们可以将处理器服务所需的任何控制器服务添加到测试运行器中,以执行其工作。为此,我们调用addControllerService方法,并提供Controller Service的标识符和Controller Service的实例。
    如果控制器服务需要进行配置,它的性质可以通过调用被设置setProperty(ControllerService, PropertyDescriptor, String),setProperty(ControllerService, String, String)或setProperty(ControllerService, PropertyDescriptor, AllowableValue)方法。这些方法中的每一个都返回一个 ValidationResult。然后可以通过调用检查该对象以确保该属性有效isValid。可以通过调用setAnnotationData(ControllerService, String)方法来设置注释数据。
    现在,我们可以通过调用来确保Controller Service有效assertValid(ControllerService)-或通过调用来确保配置的值无效(如果测试Controller Service本身) assertNotValid(ControllerService)。
    将控制器服务添加到测试运行器并进行配置后,现在可以通过调用该enableControllerService(ControllerService)方法来启用它 。如果Controller Service无效,则此方法将引发IllegalStateException。否则,该服务现在可以使用了。

    3、Set Property Values(设置属性值)

    配置任何必要的控制器服务后,我们需要配置处理器。我们可以通过调用与Controller Services相同的方法来做到这一点,而无需指定任何Controller Service。即,我们可以致电setProperty(PropertyDescriptor, String),依此类推。每个setProperty方法再次返回一个ValidationResult属性,该属性可用于确保该属性值有效。
    同样,根据我们的期望,我们还可以调用assertValid()并assertNotValid()确保处理器的配置有效或无效。

    4、Enqueue FlowFiles(使FlowFile入队)

    在触发处理器运行之前,通常需要先排队FlowFiles以便处理器进行处理。这可以通过使用来实现enqueue的方法TestRunner类。该enqueue方法具有几个不同的覆盖,并允许在的形式被添加的数据byte[],InputStream或Path。这些方法中的每一个都还支持一个变体,该变体允许Map添加以支持FlowFile属性。
    此外,还有一种enqueue方法可以使用FlowFile对象的可变参数。例如,这对于获取处理器的输出,然后将其提供给处理器的输入很有用。

    5、Run the Processor(运行处理器)

    在配置Controller Services并将所需的FlowFiles放入队列之后,可以通过调用run方法来触发Processor运行TestRunner。如果不带任何参数调用此方法,则它将使用@OnScheduled注释调用Processor中的任何onTrigger方法,一次调用Processor的方法,然后运行the@OnUnscheduled和finally@OnStopped方法。
    如果希望在触发onTrigger其他事件@OnUnscheduled和 @OnStopped生命周期事件之前运行该方法的几次迭代,run(int)则可以使用该方法来指定onTrigger应调用的迭代次数。
    有时我们想触发处理器运行,但不触发@OnUnscheduled和@OnStopped 生命周期事件。例如,这对于在发生这些事件之前检查处理器的状态很有用。可以使用run(int, boolean)和传递false作为第二个参数来实现。但是,执行此操作之后,调用@OnScheduled生命周期方法可能会导致问题。结果,onTrigger通过使用方法的run(int,boolean,boolean)版本run并将其false作为第三个参数传递,我们现在可以再次运行而不会导致这些事件发生。
    如果测试在多个线程中发生的行为很有用,也可以通过调用setThreadCount方法来实现 TestRunner。默认为1个线程。如果使用多个线程,请务必记住,的run调用TestRunner指定了应触发处理器的次数,而不是指定每个线程应触发处理器的次数。因此,如果线程计数设置为2但 run(1)被调用,则将仅使用一个线程。

    6、Validata Output(验证输出)

    在处理器完成运行之后,单元测试通常将要验证FlowFiles是否到达了预期的位置。这可以使用TestRunners assertAllFlowFilesTransferred和 assertTransferCount方法来实现。前一种方法将一个Relationship和一个整数指定为参数,以指示应将多少FlowFiles传输到该Relationship。除非将这个数量的FlowFiles转移到给定的Relationship或任何FlowFile转移到任何其他Relationship,否则该方法将无法通过单元测试。该assertTransferCount方法仅验证FlowFile计数是给定Relationship的预期数量。
    在验证计数之后,我们可以通过该getFlowFilesForRelationship方法获得实际的输出FlowFiles 。此方法返回List。请务必注意,列表的类型是MockFlowFile,而不是FlowFile接口。这样做是因为MockFlowFile提供了许多验证内容的方法。
    例如,MockFlowFile具有用于断言FlowFile属性存在(方法assertAttributeExists),断言其他属性不存在(assertAttributeNotExists),或者属性具有正确的值(assertAttributeEquals,assertAttributeNotEquals)。存在用于验证FlowFile内容的类似方法。FlowFile的内容可以与byte[],和InputStream,文件或字符串进行比较。如果期望数据是文本数据,则首选String版本,因为如果输出与预期的不同,它会提供更直观的错误消息。

    7、Mocking External Resources(模拟外部资源)

    测试连接到远程资源的NiFi处理器时,最大的问题之一是我们实际上不想从单元测试中连接到某个远程资源。我们可以在单元测试中自己站起来一个简单的服务器,并配置处理器与之通信,但是随后我们必须了解并实现特定于服务器的规范,并且可能无法正确发送回错误消息等。想要测试。
    通常,此处采用的方法是在Processor中拥有一种方法,该方法负责获取与远程资源的连接或客户端。我们通常将此方法标记为受保护。在单元测试中,不是TestRunner通过调用TestRunners.newTestRunner(Class)并提供Processor类来创建,而是在单元测试中创建Processor的子类,并使用此子类:

    1. @Test
    2. public void testConnectionFailure() {
    3. final TestRunner runner = TestRunners.newTestRunner(new MyProcessor() {
    4. protected Client getClient() {
    5. // Return a mocked out client here.
    6. return new Client() {
    7. public void connect() throws IOException {
    8. throw new IOException();
    9. }
    10. // ...
    11. // other client methods
    12. // ...
    13. };
    14. }
    15. });
    16. // rest of unit test.
    17. }

    这使我们能够实现模拟所有网络通信并返回我们要测试的不同错误结果的客户端,并确保我们的逻辑正确地处理了对客户端的成功调用。

    8、Additional Testing Capabilities(其他测试功能)

    除了测试框架提供的上述功能外,TestRunner还提供了几种方便的方法来验证处理器的行为。提供了用于确保处理器的输入队列已清空的方法。单元测试能够获取TestRunner将使用的ProcessContext,ProcessSessionFactory,ProvenanceReporter和其他特定于框架的实体。该shutdown方法可以测试注释为仅在关闭NiFi时运行的处理器方法。可以为使用自定义用户界面的处理器设置注释数据。最后,可以通过该setThreadCount(int)方法设置用于运行处理器的线程数。

    十四、NiFi Archives(NAR)

    当来自许多不同组织的软件都托管在同一环境中时,Java ClassLoader很快就会成为一个问题。如果多个组件都依赖于同一个库,但是每个组件都依赖于不同的版本,则会出现许多问题,通常会导致意外的行为或NoClassDefFoundError发生错误。为了防止这些问题成为问题,NiFi引入了NiFi存档或NAR的概念。
    NAR允许将多个组件及其依赖项一起打包到一个包中。然后提供NAR包与其他NAR包的ClassLoader隔离。开发人员应始终将其NiFi组件部署为NAR软件包。
    为此,开发人员创建了一个新的Maven Artifact,我们将其称为NAR工件。包装设置为nar。dependencies然后创建POM的部分,以使NAR依赖于要包含在NAR中的所有NiFi组件。
    为了使用的包装nar,我们必须使用nifi-nar-maven-plugin模块。通过将以下代码段添加到NAR的pom.xml中,可以将其包括在内:

    1. <build>
    2. <plugins>
    3. <plugin>
    4. <groupId>org.apache.nifi</groupId>
    5. <artifactId>nifi-nar-maven-plugin</artifactId>
    6. <version>1.1.0</version>
    7. <extensions>true</extensions>
    8. </plugin>
    9. </plugins>
    10. </build>

    在Apache NiFi代码库中,这存在于NiFi根POM中,所有其他NiFi工件(除了nifi-nar-maven-plugin本身)都从该NiPO根POM继承,因此我们不需要将其包含在其他任何一个中POM文件。
    NAR可以具有类型为的一个依赖项nar。如果指定了多个类型为的依赖项 nar,则nifi-nar-maven-plugin将出错。如果NAR A添加了对NAR B的依赖关系,则不会导致NAR B打包NAR A的所有组件。相反,这会将一个Nar-Dependency-Id元素添加到MANIFEST.MF NAR A的文件中。这将导致设置NAR B的ClassLoader作为NAR A的父级类加载器。在这种情况下,我们将NAR B称为NAR A的父级。
    父类加载器的这种链接是NiFi用来使Controller Services在所有NAR之间共享的机制。如开发ControllerService 部分中所述,Controller Service必须分为可扩展的接口ControllerService和实现该接口的实现。只要Controller Service实施和Processor共享相同的Controller Service接口定义,就可以从任何Processor中引用Controller Services,无论它位于哪个NAR中。
    为了共享相同的定义,处理器的NAR和控制器服务实现的NAR必须都具有控制器服务定义的NAR作为父代。示例层次结构可能如下所示:

    root ├── my-controller-service-api │ ├── pom.xml │ └── src │ └── main │ └── java │ └── org │ └── my │ └── services │ └── MyService.java │ ├── my-controller-service-api-nar │ └── pom.xml │ │ │ ├── my-controller-service-impl │ ├── pom.xml │ └── src │ ├── main │ │ ├── java │ │ │ └── org │ │ │ └── my │ │ │ └── services │ │ │ └── MyServiceImpl.java │ │ └── resources │ │ └── META-INF │ │ └── services │ │ └── org.apache.nifi.controller.ControllerService │ └── test │ └── java │ └── org │ └── my │ └── services │ └── TestMyServiceImpl.java │ │ ├── my-controller-service-nar │ └── pom.xml │ │ └── other-processor-nar └── pom.xml

  1. 该POM文件的类型为nar。它依赖 nifi-standard-services-api-nar。
  2. 该POM文件为类型jar。它依赖 my-controller-service-api。它不具有任何依赖 nar的工件。
  3. 该POM文件的类型为nar。它依赖 my-controller-service-api-nar。

尽管一开始这些看起来很复杂,但是在创建了这样的层次结构一次或两次之后,它就变得不那么复杂了。请注意,这里 my-controller-service-api-nar的依赖于 nifi-standard-services-api-nar。这样做是为了使任何依赖的NAR my-controller-service-api-nar也将能够访问所提供的所有Controller Services nifi-standard-services-api-nar,例如SSLContextService。同样,不必为每个服务创建不同的“ service-api” NAR。取而代之的是,通常有一个单一的“ service-api” NAR封装了许多不同Controller Services的API,就像nifi-standard-services-api-nar。通常,API不会包含广泛的依赖关系,因此,ClassLoader隔离可能不太重要,因此将许多API工件组合到同一NAR中通常是可以接受的。

十五、Per-Instance ClassLoading(每个实例的类加载)

组件开发人员可能希望在运行时将其他资源添加到组件的类路径中。例如,您可能想将JDBC驱动程序的位置提供给与关系数据库进行交互的处理器,从而允许处理器使用任何驱动程序,而不是尝试将驱动程序捆绑到NAR中。
这可以通过声明一个或多个dynamicallyModifiesClasspath设置为true的PropertyDescriptor实例 来实现。例如:

  1. PropertyDescriptor EXTRA_RESOURCE = new PropertyDescriptor.Builder()
  2. .name("Extra Resources")
  3. .description("The path to one or more resources to add to the classpath.")
  4. .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
  5. .expressionLanguageSupported(true)
  6. .dynamicallyModifiesClasspath(true)
  7. .build();

在组件上设置这些属性后,框架会识别所有dynamicallyModifiesClasspath设置为true的属性 。对于这些属性中的每一个,框架都尝试根据属性的值来解析文件系统资源。该值可以是一个或多个目录或文件的逗号分隔列表,其中会跳过不存在的任何路径。如果资源代表目录,则会列出该目录,并将该目录中的所有文件分别添加到类路径中。这些目录也将被扫描以查找本机库。如果在这些目录之一中找到一个库,则会在加载该OS之前将其创建并缓存一个OS处理的临时副本,以保持一致性和隔离类加载器。
每个属性都可能通过验证器对值的格式施加进一步的限制。例如,使用StandardValidators.FILE_EXISTS_VALIDATOR将属性限制为接受单个文件。使用StandardValidators.NON_EMPTY_VALIDATOR允许以逗号分隔的文件或目录的任意组合。
通过将资源添加到始终首先检查的内部ClassLoader,将资源添加到实例ClassLoader。只要这些属性的值改变,内部的ClassLoader就会关闭并使用新资源重新创建。
NiFi提供了@RequiresInstanceClassLoading注释,以进一步扩展和隔离组件的类路径上可用的库。您可以使用注释组件,@RequiresInstanceClassLoading 以指示该组件的实例ClassLoader需要该组件的NAR ClassLoader中所有资源的副本。如果@RequiresInstanceClassLoading不存在,则实例ClassLoader只是将其父ClassLoader设置为NAR ClassLoader,而不是复制资源。
该@RequiresInstanceClassLoading注释还提供了一个可选的标志`cloneAncestorResources’。如果设置为true,则实例ClassLoader将包含祖先资源,直到第一个ClassLoader包含该组件引用的控制器服务API,或者直到Jetty NAR。如果设置为false或未指定,则仅包含组件NAR中的资源。
因为@RequiresInstanceClassLoading为组件的每个实例从NAR ClassLoader复制资源,所以请谨慎使用此功能。如果创建了一个组件的十个实例,则该组件的NAR ClassLoader中的所有类将被加载到内存中十次。创建足够多的组件实例后,这最终可能会显着增加内存占用量。
此外,在使用Controller Services时使用@RequiresInstanceClassLoading时有一些限制。处理器,报告任务和控制器服务可以在其属性描述符之一中引用控制器服务API。当Controller Service API与引用它的组件或Controller Service实现捆绑在同一NAR中时,可能会出现问题。如果遇到以上任何一种情况,并且扩展要求实例加载,则将跳过该扩展,并记录一个适当的错误。要解决此问题,Controller Service API应该捆绑在父NAR中。引用该服务的服务实现和扩展应取决于Controller Service API NAR。请参阅以下内容中的Controller Service NAR布局NiFi存档(NAR)部分。每当Controller Service API与要求它的扩展捆绑在一起时,即使不使用@RequiresInstanceClassLoading,也会记录警告以帮助避免这种不良做法。

十六、Deprecating a Component(弃用组件)

有时可能需要弃用组件。无论何时发生这种情况,开发人员都可以使用@DeprecationNotice批注来指示某个组件已被弃用,从而允许开发人员描述弃用原因并建议其他组件。下面是如何执行此操作的示例:

  1. @DeprecationNotice(alternatives = {ListenSyslog.class}, classNames = {"org.apache.nifi.processors.standard.ListenRELP"}, reason = "Technology has been superseded", )
  2. public class ListenOldProtocol extends AbstractProcessor {