此连接器提供将数据写入Apache Cassandra数据库的接收器。
要使用此连接器,请将以下依赖项添加到项目中:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-cassandra_2.11</artifactId><version>1.7-SNAPSHOT</version></dependency>
请注意,流连接器当前不是二进制分发的一部分。在此处了解如何与群集执行链接。
安装Apache Cassandra
有多种方法可以在本地计算机上启动Cassandra实例:
- 按照Cassandra入门页面上的说明进行 算子操作。
 - 从官方Docker Repository启动一个运行Cassandra的容器
 
Cassandra Sinks
配置
Flink的Cassandra接收器是使用静态CassandraSink.addSink(DataStream)创建的build()接收器实例。
可以使用以下配置方法:
setQuery(字符串查询)
- 设置为接收器接收的每个记录执行的upsert查询。
 - 该查询在内部被视为CQL语句。
 - DO设置upsert查询以处理Tuple数据类型。
 - 请勿设置查询以处理POJO数据类型。
 
setClusterBuilder()
- 将用于配置与cassandra的连接的集群构建器设置为更复杂的设置,例如一致性级别,重试策略等。
 
setHost(String host [,int port])
- setClusterBuilder()的简单版本,包含连接到Cassandra实例的主机/端口信息
 
setMapperOptions(MapperOptions选项)
- 设置用于配置DataStax ObjectMapper的映射器选项。
 - 仅在处理POJO数据类型时适用。
 
enableWriteAheadLog([CheckpointCommitter committer])
- 一个Optional设置
 - 允许对非确定性算法进行一次性处理。
 
建立()
- 完成配置并构造CassandraSink实例。
 
预写日志
检查点提交者在某些资源中存储有关已完成检查点的其他信息。此信息用于防止在发生故障时完整重播最后完成的检查点。您可以使用a CassandraCommitter将它们存储在cassandra的单独表中。请注意,Flink不会清理此表。
如果查询是幂等的(意味着可以多次应用而不更改结果)并且启用了检查点,则Flink可以提供一次性保证。如果发生故障,将完全重播失败的检查点。
此外,对于非确定性程序,必须启用预写日志。对于这样的程序,重放检查点可能与先前的尝试完全不同,这可能使数据库处于不一致状态,因为可能已经写入了第一次尝试的部分。预写日志保证重放的检查点与第一次尝试相同。请注意,启用此函数会对延迟产生负面影响。
注意:预写日志函数目前是实验性的。在许多情况下,使用连接器而不启用它就足够了。请将问题报告给开发邮件列表。
检查点和容错
启用检查点后,Cassandra Sink保证至少一次向C *实例传递 算子操作请求。
例子
Cassandra接收器当前支持Tuple和POJO数据类型,Flink自动检测使用哪种类型的输入。有关那些流数据类型的一般用例,请参阅支持的数据类型。我们分别针对Pojo和Tuple数据类型展示了基于SocketWindowWordCount的两个实现。
在所有这些示例中,我们假设已创建关联的Keyspace example和表wordcount。
CREATE KEYSPACE IF NOT EXISTS exampleWITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};CREATE TABLE IF NOT EXISTS example.wordcount (word text,count bigint,PRIMARY KEY(word));
用于流式元组数据类型的Cassandra Sink示例
在将结果与Java / Scala Tuple数据类型存储到Cassandra接收器时,需要设置CQL upsert语句(通过setQuery(’stmt’))将每条记录保存回数据库。将upsert查询缓存为PreparedStatement,每个Tuple数据元都转换为语句的参数。
有关细节PreparedStatement和BoundStatement信息,请访问DataStax Java驱动程序手册
// get the execution environmentfinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// get input data by connecting to the socketDataStream<String> text = env.socketTextStream(hostname, port, "\n");// parse the data, group it, window it, and aggregate the countsDataStream<Tuple2<String, Long>> result = text.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Long>> out) {// normalize and split the lineString[] words = value.toLowerCase().split("\\s");// emit the pairsfor (String word : words) {//Do not accept empty word, since word is defined as primary key in C* tableif (!word.isEmpty()) {out.collect(new Tuple2<String, Long>(word, 1L));}}}}).keyBy(0).timeWindow(Time.seconds(5)).sum(1);CassandraSink.addSink(result).setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);").setHost("127.0.0.1").build();
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment// get input data by connecting to the socket val text: DataStream[String] = env.socketTextStream(hostname, port, '\n')// parse the data, group it, window it, and aggregate the counts val result: DataStream[(String, Long)] = text// split up the lines in pairs (2-tuples) containing: (word,1).flatMap(_.toLowerCase.split("\\s")).filter(_.nonEmpty).map((_, 1L))// group by the tuple field "0" and sum up tuple field "1".keyBy(0).timeWindow(Time.seconds(5)).sum(1)CassandraSink.addSink(result).setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);").setHost("127.0.0.1").build()result.print().setParallelism(1)
用于流式传输POJO数据类型的Cassandra Sink示例
流式传输POJO数据类型并将相同的POJO实体存储回Cassandra的示例。此外,此POJO实现需要遵循DataStax Java驱动程序手册来注释类,因为此实体的每个字段都使用DataStax Java Driver com.datastax.driver.mapping.Mapper类映射到指定表的关联列。
可以通过放置在Pojo类中的字段声明上的注释来定义每个表列的映射。有关映射的详细信息,请参阅有关映射类和CQL数据类型定义的 CQL文档
// get the execution environmentfinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// get input data by connecting to the socketDataStream<String> text = env.socketTextStream(hostname, port, "\n");// parse the data, group it, window it, and aggregate the countsDataStream<WordCount> result = text.flatMap(new FlatMapFunction<String, WordCount>() {public void flatMap(String value, Collector<WordCount> out) {// normalize and split the lineString[] words = value.toLowerCase().split("\\s");// emit the pairsfor (String word : words) {if (!word.isEmpty()) {//Do not accept empty word, since word is defined as primary key in C* tableout.collect(new WordCount(word, 1L));}}}}).keyBy("word").timeWindow(Time.seconds(5)).reduce(new ReduceFunction<WordCount>() {@Overridepublic WordCount reduce(WordCount a, WordCount b) {return new WordCount(a.getWord(), a.getCount() + b.getCount());}});CassandraSink.addSink(result).setHost("127.0.0.1").setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)}).build();@Table(keyspace = "example", name = "wordcount")public class WordCount {@Column(name = "word")private String word = "";@Column(name = "count")private long count = 0;public WordCount() {}public WordCount(String word, long count) {this.setWord(word);this.setCount(count);}public String getWord() {return word;}public void setWord(String word) {this.word = word;}public long getCount() {return count;}public void setCount(long count) {this.count = count;}@Overridepublic String toString() {return getWord() + " : " + getCount();}}
