Apache NiFi Connector

此连接器提供可以读取和写入Apache NiFi的源(Source)和槽(Sink). 要使用此连接器,请将以下依赖项添加到您的项目中:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-nifi{{ site.scala_version_suffix }}</artifactId>
  4. <version>{{site.version }}</version>
  5. </dependency>

请注意,流连接器当前不是二进制分发的一部分。有关如何将程序与程序库打包以进行集群执行的信息,请参阅 此处

安装 Apache NiFi

有关设置Apache NiFi集群的说明可以在这里找到。

Apache NiFi 源

该连接器提供了从Apache NiFi到Apache Flink读取数据的源。

NiFiSource(…)类提供2个构建器(constructors),用于从NiFi读取数据。

  • NiFiSource(SiteToSiteConfig config) - 为指定客户端的SiteToSiteConfig构建一个NiFiSource(…),默认等待时间为1000 ms。

  • NiFiSource(SiteToSiteConfig config, long waitTimeMs) - 为指定客户端的SiteToSiteConfig和指定的等待时间(以毫秒为单位)构建一个NiFiSource(…)

示例:

  1. StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
  2. SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
  3. .url("http://localhost:8080/nifi")
  4. .portName("Data for Flink")
  5. .requestBatchCount(5)
  6. .buildConfig();
  7. SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
  1. val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment()
  2. val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
  3. .url("http://localhost:8080/nifi")
  4. .portName("Data for Flink")
  5. .requestBatchCount(5)
  6. .buildConfig()
  7. val nifiSource = new NiFiSource(clientConfig)

这里的数据从Apache NiFi输出端口读取,该端口称为“Data for Flink”,这是Apache NiFi站点到站点协议配置的一部分。

Apache NiFi 槽(Sink)

连接器提供了一个槽(Sink),用于将数据从Apache Flink写入Apache NiFi。

NiFiSink(…) 类提供了一个实例化NiFiSink的构造函数。

  • NiFiSink(SiteToSiteClientConfig, NiFiDataPacketBuilder<T>)为指定客户端的SiteToSiteConfigNiFiDataPacketBuilder构造了一个NiFiSink(…),它将数据从Flink转换为NiFiDataPacket,将由NiFi进行获取。

示例:

  1. StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
  2. SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
  3. .url("http://localhost:8080/nifi")
  4. .portName("Data from Flink")
  5. .requestBatchCount(5)
  6. .buildConfig();
  7. SinkFunction<NiFiDataPacket> nifiSink = new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder<T>() {...});
  8. streamExecEnv.addSink(nifiSink);
  1. val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment()
  2. val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
  3. .url("http://localhost:8080/nifi")
  4. .portName("Data from Flink")
  5. .requestBatchCount(5)
  6. .buildConfig()
  7. val nifiSink: NiFiSink[NiFiDataPacket] = new NiFiSink[NiFiDataPacket](clientConfig, new NiFiDataPacketBuilder<T>() {...})
  8. streamExecEnv.addSink(nifiSink)