译者:flink.sojb.cn

此连接器提供将数据写入Apache Cassandra数据库的接收器。

要使用此连接器,请将以下依赖项添加到项目中:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-cassandra_2.11</artifactId>
  4. <version>1.7-SNAPSHOT</version>
  5. </dependency>

请注意,流连接器当前不是二进制分发的一部分。在此处了解如何与群集执行链接。

安装Apache Cassandra

有多种方法可以在本地计算机上启动Cassandra实例:

  1. 按照Cassandra入门页面上的说明进行 算子操作。
  2. 官方Docker Repository启动一个运行Cassandra的容器

Cassandra Sinks

配置

Flink的Cassandra接收器是使用静态CassandraSink.addSink(DataStream)创建的输入法。这个方法返回一个CassandraSinkBuilder,它提供了进一步配置接收器的方法,最后是build()接收器实例。

可以使用以下配置方法:

  1. setQuery(字符串查询)

    • 设置为接收器接收的每个记录执行的upsert查询。
    • 该查询在内部被视为CQL语句。
    • DO设置upsert查询以处理Tuple数据类型。
    • 请勿设置查询以处理POJO数据类型。
  2. setClusterBuilder()

    • 将用于配置与cassandra的连接的集群构建器设置为更复杂的设置,例如一致性级别,重试策略等。
  3. setHost(String host [,int port])

    • setClusterBuilder()的简单版本,包含连接到Cassandra实例的主机/端口信息
  4. setMapperOptions(MapperOptions选项)

    • 设置用于配置DataStax ObjectMapper的映射器选项。
    • 仅在处理POJO数据类型时适用。
  5. enableWriteAheadLog([CheckpointCommitter committer])

    • 一个Optional设置
    • 允许对非确定性算法进行一次性处理。
  6. 建立()

    • 完成配置并构造CassandraSink实例。

预写日志

检查点提交者在某些资源中存储有关已完成检查点的其他信息。此信息用于防止在发生故障时完整重播最后完成的检查点。您可以使用a CassandraCommitter将它们存储在cassandra的单独表中。请注意,Flink不会清理此表。

如果查询是幂等的(意味着可以多次应用而不更改结果)并且启用了检查点,则Flink可以提供一次性保证。如果发生故障,将完全重播失败的检查点。

此外,对于非确定性程序,必须启用预写日志。对于这样的程序,重放检查点可能与先前的尝试完全不同,这可能使数据库处于不一致状态,因为可能已经写入了第一次尝试的部分。预写日志保证重放的检查点与第一次尝试相同。请注意,启用此函数会对延迟产生负面影响。

注意:预写日志函数目前是实验性的。在许多情况下,使用连接器而不启用它就足够了。请将问题报告给开发邮件列表。

检查点和容错

启用检查点后,Cassandra Sink保证至少一次向C *实例传递 算子操作请求。

有关检查点文档容错保证文档的更多详细信息

例子

Cassandra接收器当前支持Tuple和POJO数据类型,Flink自动检测使用哪种类型的输入。有关那些流数据类型的一般用例,请参阅支持的数据类型。我们分别针对Pojo和Tuple数据类型展示了基于SocketWindowWordCount的两个实现。

在所有这些示例中,我们假设已创建关联的Keyspace example和表wordcount

  1. CREATE KEYSPACE IF NOT EXISTS example
  2. WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
  3. CREATE TABLE IF NOT EXISTS example.wordcount (
  4. word text,
  5. count bigint,
  6. PRIMARY KEY(word)
  7. );

用于流式元组数据类型的Cassandra Sink示例

在将结果与Java / Scala Tuple数据类型存储到Cassandra接收器时,需要设置CQL upsert语句(通过setQuery(’stmt’))将每条记录保存回数据库。将upsert查询缓存为PreparedStatement,每个Tuple数据元都转换为语句的参数。

有关细节PreparedStatementBoundStatement信息,请访问DataStax Java驱动程序手册

  1. // get the execution environment
  2. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. // get input data by connecting to the socket
  4. DataStream<String> text = env.socketTextStream(hostname, port, "\n");
  5. // parse the data, group it, window it, and aggregate the counts
  6. DataStream<Tuple2<String, Long>> result = text
  7. .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
  8. @Override
  9. public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
  10. // normalize and split the line
  11. String[] words = value.toLowerCase().split("\\s");
  12. // emit the pairs
  13. for (String word : words) {
  14. //Do not accept empty word, since word is defined as primary key in C* table
  15. if (!word.isEmpty()) {
  16. out.collect(new Tuple2<String, Long>(word, 1L));
  17. }
  18. }
  19. }
  20. })
  21. .keyBy(0)
  22. .timeWindow(Time.seconds(5))
  23. .sum(1);
  24. CassandraSink.addSink(result)
  25. .setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
  26. .setHost("127.0.0.1")
  27. .build();
  1. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  2. // get input data by connecting to the socket val text: DataStream[String] = env.socketTextStream(hostname, port, '\n')
  3. // parse the data, group it, window it, and aggregate the counts val result: DataStream[(String, Long)] = text
  4. // split up the lines in pairs (2-tuples) containing: (word,1)
  5. .flatMap(_.toLowerCase.split("\\s"))
  6. .filter(_.nonEmpty)
  7. .map((_, 1L))
  8. // group by the tuple field "0" and sum up tuple field "1"
  9. .keyBy(0)
  10. .timeWindow(Time.seconds(5))
  11. .sum(1)
  12. CassandraSink.addSink(result)
  13. .setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
  14. .setHost("127.0.0.1")
  15. .build()
  16. result.print().setParallelism(1)

用于流式传输POJO数据类型的Cassandra Sink示例

流式传输POJO数据类型并将相同的POJO实体存储回Cassandra的示例。此外,此POJO实现需要遵循DataStax Java驱动程序手册来注释类,因为此实体的每个字段都使用DataStax Java Driver com.datastax.driver.mapping.Mapper类映射到指定表的关联列。

可以通过放置在Pojo类中的字段声明上的注释来定义每个表列的映射。有关映射的详细信息,请参阅有关映射类CQL数据类型定义的 CQL文档

  1. // get the execution environment
  2. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. // get input data by connecting to the socket
  4. DataStream<String> text = env.socketTextStream(hostname, port, "\n");
  5. // parse the data, group it, window it, and aggregate the counts
  6. DataStream<WordCount> result = text
  7. .flatMap(new FlatMapFunction<String, WordCount>() {
  8. public void flatMap(String value, Collector<WordCount> out) {
  9. // normalize and split the line
  10. String[] words = value.toLowerCase().split("\\s");
  11. // emit the pairs
  12. for (String word : words) {
  13. if (!word.isEmpty()) {
  14. //Do not accept empty word, since word is defined as primary key in C* table
  15. out.collect(new WordCount(word, 1L));
  16. }
  17. }
  18. }
  19. })
  20. .keyBy("word")
  21. .timeWindow(Time.seconds(5))
  22. .reduce(new ReduceFunction<WordCount>() {
  23. @Override
  24. public WordCount reduce(WordCount a, WordCount b) {
  25. return new WordCount(a.getWord(), a.getCount() + b.getCount());
  26. }
  27. });
  28. CassandraSink.addSink(result)
  29. .setHost("127.0.0.1")
  30. .setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
  31. .build();
  32. @Table(keyspace = "example", name = "wordcount")
  33. public class WordCount {
  34. @Column(name = "word")
  35. private String word = "";
  36. @Column(name = "count")
  37. private long count = 0;
  38. public WordCount() {}
  39. public WordCount(String word, long count) {
  40. this.setWord(word);
  41. this.setCount(count);
  42. }
  43. public String getWord() {
  44. return word;
  45. }
  46. public void setWord(String word) {
  47. this.word = word;
  48. }
  49. public long getCount() {
  50. return count;
  51. }
  52. public void setCount(long count) {
  53. this.count = count;
  54. }
  55. @Override
  56. public String toString() {
  57. return getWord() + " : " + getCount();
  58. }
  59. }