Apache Cassandra Connector 连接器

This connector provides sinks that writes data into a Apache Cassandra database. 该连接器提供将数据写入Apache Cassandra数据库的接收器。

To use this connector, add the following dependency to your project: 要使用此连接器,请将以下依赖项添加到您的项目中:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-cassandra_2.11</artifactId>
  4. <version>1.7.1</version>
  5. </dependency>

Note that the streaming connectors are currently NOT part of the binary distribution. See how to link with them for cluster execution here. 请注意,流连接器当前不属于二进制分发的一部分。在此处查看如何与它们链接以执行集群。

Installing Apache Cassandra 安装

There are multiple ways to bring up a Cassandra instance on local machine: 有多种方法可以在本地计算机上启动Cassandra实例:

  1. Follow the instructions from Cassandra Getting Started page.
  2. 请遵循Cassandra入门页面中的说明。
  3. Launch a container running Cassandra from Official Docker Repository
  4. 从官方Docker存储库启动运行Cassandra的容器

Cassandra Sinks

Configurations

Flink’s Cassandra sink are created by using the static CassandraSink.addSink(DataStream <in>input) method. This method returns a CassandraSinkBuilder, which offers methods to further configure the sink, and finally build() the sink instance.</in> Flink的Cassandra接收器是使用静态CassandraSink.addSink(DataStream input)方法创建的。此方法返回一个CassandraSinkBuilder,它提供了进一步配置接收器以及最终配置build()接收器实例的方法。</ in>

The following configuration methods can be used: 可以使用以下配置方法:

  1. setQuery(String query) 字符串查询
    • Sets the upsert query that is executed for every record the sink receives.设置对接收器接收到的每个记录执行的更新查询。
    • The query is internally treated as CQL statement. 该查询在内部被视为CQL语句。
    • DO set the upsert query for processing Tuple data type. 不要设置upsert查询以处理元组数据类型。
    • DO NOT set the query for processing POJO data types.不要设置用于处理POJO数据类型的查询。
  2. setClusterBuilder()
    • Sets the cluster builder that is used to configure the connection to cassandra with more sophisticated settings such as consistency level, retry policy and etc.
    • 设置集群构建器,该集群构建器用于通过更复杂的设置(例如一致性级别,重试策略等)来配置与cassandra的连接。
  3. setHost(String host[, int port])
    • Simple version of setClusterBuilder() with host/port information to connect to Cassandra instances
    • setClusterBuilder()的简单版本,带有主机/端口信息,可连接到Cassandra实例
  4. setMapperOptions(MapperOptions options)
    • Sets the mapper options that are used to configure the DataStax ObjectMapper.
    • 设置用于配置DataStax ObjectMapper的映射器选项。
    • Only applies when processing POJO data types.
    • 仅在处理POJO数据类型时适用。
  5. enableWriteAheadLog([CheckpointCommitter committer])
    • An optional setting 一个可选的设置
    • Allows exactly-once processing for non-deterministic algorithms. 允许对非确定性算法进行一次精确处理。
  6. setFailureHandler([CassandraFailureHandler failureHandler])
    • An optional setting 一个可选的设置
    • Sets the custom failure handler. 设置自定义失败处理程序。
  7. build()
    • Finalizes the configuration and constructs the CassandraSink instance. 完成配置并构造CassandraSink实例

Write-ahead Log 预写日志

A checkpoint committer stores additional information about completed checkpoints in some resource. This information is used to prevent a full replay of the last completed checkpoint in case of a failure. You can use a CassandraCommitter to store these in a separate table in cassandra. Note that this table will NOT be cleaned up by Flink. 检查点提交者在某些资源中存储有关已完成的检查点的其他信息。该信息用于防止发生故障时完全重播最后完成的检查点。您可以使用将CassandraCommitter它们存储在cassandra中的单独表中。请注意,Flink不会清理该表。

Flink can provide exactly-once guarantees if the query is idempotent (meaning it can be applied multiple times without changing the result) and checkpointing is enabled. In case of a failure the failed checkpoint will be replayed completely. 如果查询是幂等的(意味着可以多次应用而不更改结果)并且启用了检查点,则Flink可以提供一次准确的保证。如果发生故障,则失败的检查点将被完全重播。

Furthermore, for non-deterministic programs the write-ahead log has to be enabled. For such a program the replayed checkpoint may be completely different than the previous attempt, which may leave the database in an inconsistent state since part of the first attempt may already be written. The write-ahead log guarantees that the replayed checkpoint is identical to the first attempt. Note that that enabling this feature will have an adverse impact on latency. 此外,对于非确定性程序,必须启用预写日志。对于这样的程序,重播的检查点可能与之前的尝试完全不同,这可能会使数据库处于不一致状态,因为第一次尝试的一部分可能已经被写入。预写日志可确保重播的检查点与第一次尝试相同。请注意,启用此功能将对延迟产生不利影响。

Note: The write-ahead log functionality is currently experimental. In many cases it is sufficient to use the connector without enabling it. Please report problems to the development mailing list. 注意:预写日志功能当前处于试验阶段。在许多情况下,使用连接器而不启用它就足够了。请向开发邮件列表报告问题。

Checkpointing and Fault Tolerance 检查点和容错

With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C instance. 启用检查点功能后,Cassandra Sink保证至少一次将动作请求传递到C 实例。

More details on checkpoints docs and fault tolerance guarantee docs 有关检查点文档和容错保证文档的更多详细信息

Examples

The Cassandra sinks currently support both Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to Supported Data Types. We show two implementations based on SocketWindowWordCount, for Pojo and Tuple data types respectively. Cassandra接收器当前支持Tuple和POJO数据类型,并且Flink自动检测使用哪种输入类型。有关这些流数据类型的一般用例,请参阅支持的数据类型。我们展示了两个基于SocketWindowWordCount的实现,分别用于Pojo和Tuple数据类型。

In all these examples, we assumed the associated Keyspace example and Table wordcount have been created. 在所有这些示例中,我们假定已创建了关联的键空间example和表wordcount。

  1. CREATE KEYSPACE IF NOT EXISTS example
  2. WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
  3. CREATE TABLE IF NOT EXISTS example.wordcount (
  4. word text,
  5. count bigint,
  6. PRIMARY KEY(word)
  7. );

Cassandra Sink Example for Streaming Tuple Data Type 流元组数据类型的Cassandra Sink示例

While storing the result with Java/Scala Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery(‘stmt’)) to persist each record back to the database. With the upsert query cached as PreparedStatement, each Tuple element is converted to parameters of the statement. 将具有Java / Scala Tuple数据类型的结果存储到Cassandra接收器时,需要设置CQL upsert语句(通过setQuery(’stmt’))将每条记录持久化回数据库。将upsert查询缓存为时PreparedStatement,每个Tuple元素都将转换为语句的参数。

For details about PreparedStatement and BoundStatement, please visit DataStax Java Driver manual 有关细节PreparedStatement和BoundStatement信息,请访问DataStax Java驱动程序手册

  1. // get the execution environment
  2. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. // get input data by connecting to the socket
  4. DataStream<String> text = env.socketTextStream(hostname, port, "\n");
  5. // parse the data, group it, window it, and aggregate the counts
  6. DataStream<Tuple2<String, Long>> result = text
  7. .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
  8. @Override
  9. public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
  10. // normalize and split the line
  11. String[] words = value.toLowerCase().split("\\s");
  12. // emit the pairs
  13. for (String word : words) {
  14. //Do not accept empty word, since word is defined as primary key in C* table
  15. if (!word.isEmpty()) {
  16. out.collect(new Tuple2<String, Long>(word, 1L));
  17. }
  18. }
  19. }
  20. })
  21. .keyBy(0)
  22. .timeWindow(Time.seconds(5))
  23. .sum(1);
  24. CassandraSink.addSink(result)
  25. .setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
  26. .setHost("127.0.0.1")
  27. .build();
  1. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  2. // get input data by connecting to the socket val text: DataStream[String] = env.socketTextStream(hostname, port, '\n')
  3. // parse the data, group it, window it, and aggregate the counts val result: DataStream[(String, Long)] = text
  4. // split up the lines in pairs (2-tuples) containing: (word,1)
  5. .flatMap(_.toLowerCase.split("\\s"))
  6. .filter(_.nonEmpty)
  7. .map((_, 1L))
  8. // group by the tuple field "0" and sum up tuple field "1"
  9. .keyBy(0)
  10. .timeWindow(Time.seconds(5))
  11. .sum(1)
  12. CassandraSink.addSink(result)
  13. .setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
  14. .setHost("127.0.0.1")
  15. .build()
  16. result.print().setParallelism(1)

Cassandra Sink Example for Streaming POJO Data Type 用于流式POJO数据类型的Cassandra Sink示例

An example of streaming a POJO data type and store the same POJO entity back to Cassandra. In addition, this POJO implementation needs to follow DataStax Java Driver Manual to annotate the class as each field of this entity is mapped to an associated column of the designated table using the DataStax Java Driver com.datastax.driver.mapping.Mapper class. 流式传输POJO数据类型并将相同的POJO实体存储回Cassandra的示例。另外,此POJO实现需要遵循DataStax Java驱动程序手册来注释该类,因为使用DataStax Java Driver com.datastax.driver.mapping.Mapper类将该实体的每个字段映射到指定表的关联列。

The mapping of each table column can be defined through annotations placed on a field declaration in the Pojo class. For details of the mapping, please refer to CQL documentation on Definition of Mapped Classes and CQL Data types 可以通过在Pojo类中的字段声明上放置的注释来定义每个表列的映射。有关映射的详细信息,请参阅有关映射类的定义和CQL数据类型的 CQL文档。

  1. // get the execution environment
  2. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. // get input data by connecting to the socket
  4. DataStream<String> text = env.socketTextStream(hostname, port, "\n");
  5. // parse the data, group it, window it, and aggregate the counts
  6. DataStream<WordCount> result = text
  7. .flatMap(new FlatMapFunction<String, WordCount>() {
  8. public void flatMap(String value, Collector<WordCount> out) {
  9. // normalize and split the line
  10. String[] words = value.toLowerCase().split("\\s");
  11. // emit the pairs
  12. for (String word : words) {
  13. if (!word.isEmpty()) {
  14. //Do not accept empty word, since word is defined as primary key in C* table
  15. out.collect(new WordCount(word, 1L));
  16. }
  17. }
  18. }
  19. })
  20. .keyBy("word")
  21. .timeWindow(Time.seconds(5))
  22. .reduce(new ReduceFunction<WordCount>() {
  23. @Override
  24. public WordCount reduce(WordCount a, WordCount b) {
  25. return new WordCount(a.getWord(), a.getCount() + b.getCount());
  26. }
  27. });
  28. CassandraSink.addSink(result)
  29. .setHost("127.0.0.1")
  30. .setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
  31. .build();
  32. @Table(keyspace = "example", name = "wordcount")
  33. public class WordCount {
  34. @Column(name = "word")
  35. private String word = "";
  36. @Column(name = "count")
  37. private long count = 0;
  38. public WordCount() {}
  39. public WordCount(String word, long count) {
  40. this.setWord(word);
  41. this.setCount(count);
  42. }
  43. public String getWord() {
  44. return word;
  45. }
  46. public void setWord(String word) {
  47. this.word = word;
  48. }
  49. public long getCount() {
  50. return count;
  51. }
  52. public void setCount(long count) {
  53. this.count = count;
  54. }
  55. @Override
  56. public String toString() {
  57. return getWord() + " : " + getCount();
  58. }
  59. }