License of the RabbitMQ Connector

Flink’s RabbitMQ connector defines a Maven dependency on the “RabbitMQ AMQP Java Client”, is triple-licensed under the Mozilla Public License 1.1 (“MPL”), the GNU General Public License version 2 (“GPL”) and the Apache License version 2 (“ASL”).
Flink的RabbitMQ连接器定义了对“ RabbitMQ AMQP Java客户端”的Maven依赖关系,在Mozilla Public License 1.1(“ MPL”),GNU General Public License版本2(“ GPL”)和Apache License版本2下获得了三重许可。 (以下简称“ ASL”)。

Flink itself neither reuses source code from the “RabbitMQ AMQP Java Client” nor packages binaries from the “RabbitMQ AMQP Java Client”.
Flink本身既不重用“ RabbitMQ AMQP Java客户端”中的源代码,也不打包“ RabbitMQ AMQP Java客户端”中的二进制文件。

Users that create and publish derivative work based on Flink’s RabbitMQ connector (thereby re-distributing the “RabbitMQ AMQP Java Client”) must be aware that this may be subject to conditions declared in the Mozilla Public License 1.1 (“MPL”), the GNU General Public License version 2 (“GPL”) and the Apache License version 2 (“ASL”).
基于Flink的RabbitMQ连接器创建并发布衍生作品(从而重新分发“ RabbitMQ AMQP Java客户端”)的用户必须注意,这可能要遵守Mozilla Public License 1.1(“ MPL”)(即GNU)中声明的条件。通用公共许可证版本2(“ GPL”)和Apache许可证版本2(“ ASL”)。

RabbitMQ Connector

This connector provides access to data streams from RabbitMQ. To use this connector, add the following dependency to your project:
该连接器提供对RabbitMQ数据流的访问。要使用此连接器,请将以下依赖项添加到您的项目中:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-rabbitmq_2.11</artifactId>
  4. <version>1.7.1</version>
  5. </dependency>

Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution here.
请注意,流连接器当前不是二进制分发的一部分。请参阅此处链接以执行集群。

Installing RabbitMQ

Follow the instructions from the RabbitMQ download page. After the installation the server automatically starts, and the application connecting to RabbitMQ can be launched.
请遵循RabbitMQ下载页面上的说明。安装之后,服务器将自动启动,并且可以启动连接到RabbitMQ的应用程序。

RabbitMQ Source

This connector provides a RMQSource class to consume messages from a RabbitMQ queue. This source provides three different levels of guarantees, depending on how it is configured with Flink:
该连接器提供了一个RMQSource使用来自RabbitMQ队列的消息的类。此来源提供三种不同级别的保证,具体取决于如何使用Flink进行配置:

  1. Exactly-once: In order to achieve exactly-once guarantees with the RabbitMQ source, the following is required -
  2. 精确一次:为了使用RabbitMQ源实现精确一次的保证,需要以下内容-

    • Enable checkpointing: With checkpointing enabled, messages are only acknowledged (hence, removed from the RabbitMQ queue) when checkpoints are completed.
    • 启用检查点:启用检查点后,仅在完成检查点时才确认消息(因此,从RabbitMQ队列中删除了消息)。
    • Use correlation ids: Correlation ids are a RabbitMQ application feature. You have to set it in the message properties when injecting messages into RabbitMQ. The correlation id is used by the source to deduplicate any messages that have been reprocessed when restoring from a checkpoint.
    • 使用相关ID:相关ID是RabbitMQ应用程序的功能。将消息注入RabbitMQ时,必须在消息属性中进行设置。从检查点还原时,源使用相关ID对重复处理的所有消息进行重复数据删除。
    • Non-parallel source: The source must be non-parallel (parallelism set to 1) in order to achieve exactly-once. This limitation is mainly due to RabbitMQ’s approach to dispatching messages from a single queue to multiple consumers.
    • 非并行源:源必须是非并行的(并行度设置为1)才能一次。此限制主要是由于RabbitMQ的方法将消息从单个队列分发到多个使用者。
  3. At-least-once: When checkpointing is enabled, but correlation ids are not used or the source is parallel, the source only provides at-least-once guarantees.
  4. 至少一次:启用检查点但未使用关联ID或源为并行时,源仅提供至少一次保证。
  5. No guarantee: If checkpointing isn’t enabled, the source does not have any strong delivery guarantees. Under this setting, instead of collaborating with Flink’s checkpointing, messages will be automatically acknowledged once the source receives and processes them.
  6. 保证:如果未启用检查点,则源没有任何有力的交付保证。在此设置下,源将在接收并处理消息后自动确认消息,而不是与Flink的检查点协作。
    Below is a code example for setting up an exactly-once RabbitMQ source. Inline comments explain which parts of the configuration can be ignored for more relaxed guarantees.
    下面是一个代码示例,用于设置一次精确的RabbitMQ源。内联注释说明可以忽略配置的哪些部分以获得更宽松的保证。
  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. // checkpointing is required for exactly-once or at-least-once guarantees
  3. env.enableCheckpointing(...);
  4. final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
  5. .setHost("localhost")
  6. .setPort(5000)
  7. ...
  8. .build();
  9. final DataStream<String> stream = env
  10. .addSource(new RMQSource<String>(
  11. connectionConfig, // config for the RabbitMQ connection
  12. "queueName", // name of the RabbitMQ queue to consume
  13. true, // use correlation ids; can be false if only at-least-once is required
  14. new SimpleStringSchema())) // deserialization schema to turn messages into Java objects
  15. .setParallelism(1); // non-parallel source is only required for exactly-once
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. // checkpointing is required for exactly-once or at-least-once guarantees env.enableCheckpointing(...)
  3. val connectionConfig = new RMQConnectionConfig.Builder()
  4. .setHost("localhost")
  5. .setPort(5000)
  6. ...
  7. .build
  8. val stream = env
  9. .addSource(new RMQSource[String](784e93ad214998a2ffcfbcb0e7775cfc)) // deserialization schema to turn messages into Java objects
  10. .setParallelism(1) // non-parallel source is only required for exactly-once

RabbitMQ Sink

This connector provides a RMQSink class for sending messages to a RabbitMQ queue. Below is a code example for setting up a RabbitMQ sink.
该连接器提供了RMQSink用于将消息发送到RabbitMQ队列的类。以下是设置RabbitMQ接收器的代码示例。

  1. final DataStream<String> stream = ...
  2. final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
  3. .setHost("localhost")
  4. .setPort(5000)
  5. ...
  6. .build();
  7. stream.addSink(new RMQSink<String>(
  8. connectionConfig, // config for the RabbitMQ connection
  9. "queueName", // name of the RabbitMQ queue to send messages to
  10. new SimpleStringSchema())); // serialization schema to turn Java objects to messages
  1. val stream: DataStream[String] = ...
  2. val connectionConfig = new RMQConnectionConfig.Builder()
  3. .setHost("localhost")
  4. .setPort(5000)
  5. ...
  6. .build
  7. stream.addSink(new RMQSink[String](bed16f1cf61e8405390ebc62f19671d0)) // serialization schema to turn Java objects to messages

More about RabbitMQ can be found here.