RabbitMQ Connector

RabbitMQ连接器许可证

Flink下的RabbitMQ连接器位于一个maven依赖” RabbitMQ AMQP Java Clien”上,由Mozilla Public License v1.1 (MPL 1.1) 许可。

Flink本身不重写” RabbitMQ AMQP Java Clien”中的源码,也不对其进行打包成二进制文件。 用户基于flink的rabbitMQ连接器(即RabbitMQ AMQP Java Clien)创建和发布拓展开的工作,可能会受到Mozilla Public License v1.1 (MPL 1.1)说明的一些限制。

RabbitMQ连接器

该连接器访问流数据来源RabbitMQ。为使用连接器,请添加如下依赖在你的项目中,

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-rabbitmq_2.10</artifactId>
  4. <version>1.2.0</version>
  5. </dependency>

译者注:上述方法是由maven构建项目时使用,当使用sbt构建项目时,需要在build.sbt中的正确子项目中添加如下:

  1. ("org.apache.flink" %% "flink-connector-rabbitmq" % flinkVersion).
  2. exclude("org.apache.flink","flink-shaded-hadoop1_2.10"),
  3. 如果使用scala2.10,则不需要exclude.

注意的是,流连接器当前都不是二元分布。更多集群执行请看这里

安装RabbitMQ

阅读Rabbitmq下载页的介绍。安装后,服务器会自动启动,应用程序连接rabbitmq将会启动。

Rabbitmq数据源

连接器提供一个类RMQSource,以消费源自于rabbitmq队列里的消息。消费RabbitMQ数据源可有三个不同层级保证,取决于flink的配置如何。

1, 仅有一次,为实现保证仅有一次消费rabbitmq数据源,如下是需要的—

  • 可检查点:检查点生效后,在检查点完成后,消息是互相确认的(因此,会把消息从rabbitmq中删除)。

  • 使用相关编号:相关编号是rabbitmq应用的特征,当提交一个消息进rabbitmq时,必须得在消息配置中设置一个相关编号。在检查点恢复是,源利用相关编号去重已经被处理过的数据,

  • 非并行的源:实现仅有一次,源必须非并行(并行度为1)。这个限制是因为rabbitmq是从一个单一队列存在多个消费者的调度消息方式。

2, 至少一次:当检查点生效,但是没有使用相关编号或者源是并行的,源仅仅提供至少消费一次的保证。

3, 没有保证:如果检查点未生效,源没有任何强分发的保证。这种设置下,替代flink的检查点,一旦接收和处理消息后,消息将自动确认。

如下代码是设置成仅一次消费的例子。注释内容解释哪部分设置可忽略,以得到更多灵活保证。

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.enableCheckpointing(...);// 仅一次或至少一次,检查点是必须的
  3. final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
  4. .setHost("localhost")
  5. .setPort(5000)
  6. ...
  7. .build();
  8. final DataStream<String> stream = env
  9. .addSource(new RMQSource<String>(
  10. connectionConfig, // rabbitmq连接的配置
  11. "queueName", // rabbitmq的队列名,消费的队列名
  12. true, // 使用相关编号,至少一次时设置为false
  13. new SimpleStringSchema())) // 反序列化成java的对象
  14. .setParallelism(1); // 非并行是仅一次所必须的
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. env.enableCheckpointing(...)
  3. val connectionConfig = new RMQConnectionConfig.Builder()
  4. .setHost("localhost")
  5. .setPort(5000)
  6. ...
  7. .build
  8. val stream = env
  9. .addSource(new RMQSource[String](
  10. connectionConfig, // 配置
  11. "queueName", // 队列名
  12. true, // 使用相关编号
  13. new SimpleStringSchema)) // 反序列化
  14. .setParallelism(1) // 非序列化

Rabbitmq接收

连接器提供类RMQSink来发送消息到rabbitmq队列里,以下代码是一个rabbitmq接收的配置例子,

  1. final DataStream<String> stream = ...
  2. final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
  3. .setHost("localhost")
  4. .setPort(5000)
  5. ...
  6. .build();
  7. stream.addSink(new RMQSink<String>(
  8. connectionConfig,
  9. "queueName",
  10. new SimpleStringSchema())); //序列化
  11. val stream: DataStream[String] = ...
  1. val connectionConfig = new RMQConnectionConfig.Builder()
  2. .setHost("localhost")
  3. .setPort(5000)
  4. ...
  5. .build
  6. stream.addSink(new RMQSink[String](
  7. connectionConfig,
  8. "queueName",
  9. new SimpleStringSchema))