译者:flink.sojb.cn

此连接器提供可以读取和写入Apache NiFi的源和接收器 。要使用此连接器,请将以下依赖项添加到项目中:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-nifi_2.11</artifactId>
  4. <version>1.7-SNAPSHOT</version>
  5. </dependency>

请注意,流连接器当前不是二进制分发的一部分。 有关如何使用库将程序打包以执行集群的信息,请参见 此处

安装Apache NiFi

有关设置Apache NiFi群集的说明,请 访问此处

Apache NiFi Source

连接器提供了一个Source,用于从Apache NiFi到Apache Flink读取数据。

该类NiFiSource(…)提供了2个用于从NiFi读取数据的构造函数。

  • NiFiSource(SiteToSiteConfig config)- 构造NiFiSource(…)给定客户端的SiteToSiteConfig,默认等待时间为1000毫秒。

  • NiFiSource(SiteToSiteConfig config, long waitTimeMs)- 构造NiFiSource(…)给定客户端的SiteToSiteConfig和指定的等待时间(以毫秒为单位)。

例:

  1. StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
  2. SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
  3. .url("http://localhost:8080/nifi")
  4. .portName("Data for Flink")
  5. .requestBatchCount(5)
  6. .buildConfig();
  7. SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
  1. val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment()
  2. val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
  3. .url("http://localhost:8080/nifi")
  4. .portName("Data for Flink")
  5. .requestBatchCount(5)
  6. .buildConfig()
  7. val nifiSource = new NiFiSource(clientConfig)

这里的数据是从名为“Data for Flink”的Apache NiFi输出端口读取的,这是Apache NiFi站点到站点协议配置的一部分。

Apache NiFi Sink

连接器提供了一个Sink,用于将数据从Apache Flink写入Apache NiFi。

该类NiFiSink(…)提供了一个用于实例化a的构造函数NiFiSink

  • NiFiSink(SiteToSiteClientConfig, NiFiDataPacketBuilder&lt;T&gt;)构造一个NiFiSink(…)给定客户端SiteToSiteConfig和一个NiFiDataPacketBuilder将来自Flink的数据转换NiFiDataPacket为由NiFi摄取的数据。

例:

  1. StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
  2. SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
  3. .url("http://localhost:8080/nifi")
  4. .portName("Data from Flink")
  5. .requestBatchCount(5)
  6. .buildConfig();
  7. SinkFunction<NiFiDataPacket> nifiSink = new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder<T>() {...});
  8. streamExecEnv.addSink(nifiSink);
  1. val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment()
  2. val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
  3. .url("http://localhost:8080/nifi")
  4. .portName("Data from Flink")
  5. .requestBatchCount(5)
  6. .buildConfig()
  7. val nifiSink: NiFiSink[NiFiDataPacket] = new NiFiSink[NiFiDataPacket](d259ca1a487802fa926f8585cee068d7) {...})
  8. streamExecEnv.addSink(nifiSink)

有关Apache NiFi站点到站点协议的更多信息,请访问此处