算子

MapPartition

此算子在一个来自上游的并行分区里处理数据。它以一个迭代器来引入上游分区的数据,并且支持任意数量的结果输出。每个输入分区的数据数量取决于并行度和上游算子。

  1. data.mapPartition(new MapPartitionFunction<String, Long>() {
  2. public void mapPartition(Iterable<String> values, Collector<Long> out) {
  3. long c = 0;
  4. for (String s : values) {
  5. c++;
  6. }
  7. out.collect(c);
  8. }
  9. });

ReduceGroup

合并组里的一堆数据。此算子可以应用于全量数据或者一个分组后的数据集。

  1. data.reduceGroup(new GroupReduceFunction<Integer, Integer> {
  2. public void reduce(Iterable<Integer> values, Collector<Integer> out) {
  3. int prefixSum = 0;
  4. for (Integer i : values) {
  5. prefixSum += i;
  6. out.collect(prefixSum);
  7. }
  8. }
  9. });

Distinct

次算子返回DataSet里的唯一值。用来去重。

Join

此算子为内连接,即某方存在另一方不存在的key,舍弃该key对应数据。

Cross

此算子对两个数据流的数据类型不做一致性的要求,所有数据会两两结合,生成新的字符串。与Join不同,Join需要双方按照约定的键进行等值连接,Cross任意两两都可组合。

Hash-Partition

此算子对数据中指定的某个字段做hash,hash结果相同的落到同一分区。

Range-Partition

此算子先抽样确定key的上下限,然后根据分区数划分各个分区的上下限,然后后续的key根据大小归入对应的分区。

Sort Partition

此算子按照某个字段按照正序或者反序来排布DataSet。

First-n

用来输出每个分区中前n个数据,这个是分组后的每组内前n个。“前n个”可以是按照数据读取的前后顺序,也可以是按照某个字段排序后的顺序。

Sum,Min,Max

一般接在groupBy后面。在分组后的组内进行求和,求最小值和最大值。一般用在tuple数据结构上,最后的结果除了被用来分组的和计算的字段,其他字段取组内最后一个数据的字段值。

andSum,andMin,andMax

一般接在sum、min和max后面,继续在组里对其他字段进行类似操作,这样的话,被计算的字段的值就不是默认的组内最后一个数据的该字段值,而是此次计算后的sum或者min或者max。与.aggregate(MIN, 2)不同,后者是在上次计算的基础上对所有结果再次做聚合,前后两次没有联系。

Source

readFileOfPrimitives(path, delimiter, Class)

从文件读取,直接读成最后的数据结构。按照约定的分割方式来分割文件中的数据,要么是按行分割,要么是按指定的分隔符号。

Example

  1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. // read text file from local files system
  3. DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile");
  4. // read text file from an HDFS running at nnHost:nnPort
  5. DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");
  6. // read a CSV file with three fields
  7. DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
  8. .types(Integer.class, String.class, Double.class);
  9. // read a CSV file with five fields, taking only two of them
  10. DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
  11. .includeFields("10010") // take the first and the fourth field
  12. .types(String.class, Double.class);
  13. // read a CSV file with three fields into a POJO (Person.class) with corresponding fields
  14. DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
  15. .pojoType(Person.class, "name", "age", "zipcode");
  16. // read a file from the specified path of type SequenceFileInputFormat
  17. DataSet<Tuple2<IntWritable, Text>> tuples =
  18. env.createInput(HadoopInputs.readSequenceFile(IntWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file"));
  19. // creates a set from some given elements
  20. DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar");
  21. // generate a number sequence
  22. DataSet<Long> numbers = env.generateSequence(1, 10000000);
  23. // Read data from a relational database using the JDBC input format
  24. DataSet<Tuple2<String, Integer> dbData =
  25. env.createInput(
  26. JdbcInputFormat.buildJdbcInputFormat()
  27. .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
  28. .setDBUrl("jdbc:derby:memory:persons")
  29. .setQuery("select name, age from persons")
  30. .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
  31. .finish()
  32. );
  33. // Note: Flink's program compiler needs to infer the data types of the data items which are returned
  34. // by an InputFormat. If this information cannot be automatically inferred, it is necessary to
  35. // manually provide the type information as shown in the examples above.

csv解析

parseQuotedStrings(char quoteChar),此函数用来使用指定字符来使解析器忽略解析该字符包裹起来的整个字符串,一般常见的有双引号。比如”a, b”,c,d最后会被解析成三段,分别是(a, b)、(c)、(d)。

迭代计算

类似于DataSteam中的迭代,DataSet中一个很重要的功能就是数据集迭代计算,对应到各种机器学习的算法,迭代是很常见的一个计算模式。Flink DataSet支持两种类型的迭代。

Bulk Iterations

最典型的代码骨架就是先用iterate(int)来将一个普通DataSet转化为IterativeDataSet,然后在IterativeDataSet上进行单次迭代所需的计算,定义完单次迭代的计算流程后,在初始IterativeDataSet上使用closeWith(DataSet)来指定添加一系列单次迭代计算算子的DataSet来做数据回流。

Delta Iterations

这种迭代利用了一种算法特性,某些算法的结果总有一部分并不是在每次迭代中都会改变的。所以Delta Iterations会在每次迭代中将一部分数据回流(wordset),而另一个部分则保持作为状态,并在每次迭代中更新(solution set)。

在函数中操作数据

禁用数据重用(默认)

  1. 从集合中collect数据是安全的。但是当读取数据的函数结束后,该数据可能会改变,所以在对该数据的读取操作后保存数据可能不安全(后续被篡改)。
  2. 你可以在函数中emit数据,但是当emit后再读数据不安全,因为emit后可能会被修改。

    语义注解

    Flink允许用户添加字段转发注解或者函数来使得Flink可以推断如何在多个算子之间重用排序和分区,使用语义注解可以显著较少不必要的网络shuffle和不必要的排序,来加速程序的执行,提高性能。

    Forwarded Fields Annotation

    1. @ForwardedFields("f0->f2")
    2. public class MyMap implements
    3. MapFunction<Tuple2<Integer, Integer>, Tuple3<String, Integer, Integer>> {
    4. @Override
    5. public Tuple3<String, Integer, Integer> map(Tuple2<Integer, Integer> val) {
    6. return new Tuple3<String, Integer, Integer>("foo", val.f1 / 2, val.f0);
    7. }
    8. }

    Non-Forwarded Fields

    1. @NonForwardedFields("f1") // second field is not forwarded
    2. public class MyMap implements
    3. MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
    4. @Override
    5. public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> val) {
    6. return new Tuple2<Integer, Integer>(val.f0, val.f1 / 2);
    7. }
    8. }

    Read Fields

    1. @ReadFields("f0; f3") // f0 and f3 are read and evaluated by the function.
    2. public class MyMap implements
    3. MapFunction<Tuple4<Integer, Integer, Integer, Integer>,
    4. Tuple2<Integer, Integer>> {
    5. @Override
    6. public Tuple2<Integer, Integer> map(Tuple4<Integer, Integer, Integer, Integer> val) {
    7. if(val.f0 == 42) {
    8. return new Tuple2<Integer, Integer>(val.f0, val.f1);
    9. } else {
    10. return new Tuple2<Integer, Integer>(val.f3+10, val.f1);
    11. }
    12. }
    13. }