Data types

序列化在分布式系统中至关重要;这是不同机器上的进程能够相互共享数据的基础。

  • DataSet 和 DataStream API 共享相同的类型系统
  • Flink 有自己的序列化器,主要用于
    • 基础类型
      • String, Long, Integer, Boolean, …
      • Arrays
    • 复合类型
      • Tuples
      • POJOs
      • Scala Case Classes
  • 其他情况下,否则 Flink 回退到 Kryo 进行 ser/de.
  • 也可以采用其他Flink的序列化器,如Avro。

    Tuples

    Scala: 采用默认的Scala tuples (1 to 22 fields)
    Java: Flink实现了Tuple1到Tuple25 ```java Tuple2 person = new Tuple2<>(“Max Mustermann”, 42);

// zero based index! String name = person.f0; Integer age = person.f1;

  1. <a name="S3SCR"></a>
  2. ## POJOs
  3. 任意一个Java类,满足:
  4. - 有一个空的构造函数
  5. - 可被公开访问的属性(public 属性或者默认的getter/setter)
  6. ```java
  7. public class Person {
  8. public String name;
  9. public Integer age;
  10. public Person() {};
  11. public Person(String name, Integer age) {…};
  12. }
  13. DataStream<Person> p = env.fromElements(new Person("Bob", 65));

从 Flink 1.8 开始,Flink 将负责schema演变和state迁移,以便对使用 Flink 内置序列化器序列化的 POJO 进行直接更改。

Case classes (Scala)

Scala case classes是原生支持的

  1. case class Person(name: String, age: Int)
  2. d: DataStream[Person] =
  3. env.fromElements(Person("Bob", 65))`

目前(尚)不支持 Scala case class的状态演化。见 FLINK-10896, 目前技术可行性还待验证。

DataStream API: 基础算子

Stream Processing

在无界流上持续运行的计算
image.png
在Flink流式计算简介一节中,提到 Flink 提供了一次一个事件的流处理。
现在具体看看:在“your code”的地方可以做什么?
总结来说,就是filtering和transforming。

Transformations: Filter

  1. public static void main(String[] args) throws Exception {
  2. final StreamExecutionEnvironment env =
  3. StreamExecutionEnvironment.getExecutionEnvironment();
  4. DataStream<Integer> integers = env.fromElements(1, 2, 3, 4, 5);
  5. DataStream<Integer> odds = integers
  6. .filter(new FilterFunction<Integer>() {
  7. @Override
  8. public boolean filter(Integer value) {
  9. return ((value % 2) == 1);
  10. }
  11. });
  12. odds.print();
  13. env.execute();
  14. }
  15. > 1, 3, 5

以上是一个完整的Flink应用的实际例子,除了filter以外,还可以关注下

  1. StreamingExecutionEnvironment env
  2. print()
  3. execute()

Transformations: Map

  1. DataStream<Integer> integers = env.fromElements(1, 2, 3, 4, 5);
  2. DataStream<Integer> doubleOdds = integers
  3. .filter(new FilterFunction<Integer>() {
  4. @Override
  5. public boolean filter(Integer value) {
  6. return ((value % 2) == 1);
  7. }
  8. })
  9. .map(new MapFunction<Integer, Integer>() {
  10. @Override
  11. public Integer map(Integer value) {
  12. return value * 2;
  13. }
  14. });
  15. doubleOdds.print();
  16. > 2, 6, 10

注意FilterFunction和MapFunction的接口声明,参数不同。虽然都是对单元素处理,MapFunction中第1个接受输入、第2个接受输出。

Transformations: FlatMap

  1. DataStream<Integer> integers = env.fromElements(1, 2, 3, 4, 5);
  2. DataStream<Integer> doubleOdds = integers
  3. .flatMap(new FlatMapFunction<Integer, Integer>() {
  4. @Override
  5. public void flatMap(Integer value, Collector<Integer> out) {
  6. if ((value % 2) == 1) {
  7. out.collect(value * 2);
  8. }
  9. }
  10. });
  11. doubleOdds.print();

注意,这里FlatMapFunction interface也是接收2个参数,一个是输入类型,一个是输出类型。里面flatMap函数返回类型为void,同时值得注意的是函数的第2个参数是Collector类型。

flatMap 在函数必须处理无法解析的输入或其他错误情况时很有用。或者当单个输入记录可能产生多个输出记录时,例如,如果它需要解包打平一个数组。

Distributed stream processing: keyBy

image.png

  • 跨多个实例分布计算
  • 键是根据每个流元素计算的
  • 对数据进行分区——具有相同键的所有元素将由相同的算子实例处理
  • 特定算子是key-aware感知的,例如 reduce 和 window
  • Flink 管理每个key的state和timers。

某个具体key相关的所有数据在同一个worker实例被处理。keys并不被附加到流记录里,而是在 KeySelector 需要时重新计算。 所以 KeySelector 函数需要快速执行且具有确定性。

实现一个KeySelector

Java7的风格

  1. keyedSensorReadings = sensorReadings.keyBy(
  2. new KeySelector<Sensor, String>() {
  3. @Override
  4. public String getKey(Sensor reading) throws Exception {
  5. return reading.id;
  6. }
  7. }
  8. )

Java8的风格

  1. keyedSensorReadings = sensorReadings.keyBy(r -> r.id)

基于数字字段位置(元组内)和字段名称(用于 POJO 和案例类)的key selectors在 Flink 1.11 中已弃用。详见https://nightlies.apache.org/flink/flink-docs-release-1.11/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#keyBy-int…-

有效的key类型

  • key由KeySelector方法从每一个stream消息中计算而来。
    • KeySelector必须是确定性的
    • 它计算的key必须具有 hashCode() 和 equals() 的有效实现
    • 数组和枚举类型不能作为key。因为它们的 hashCode() 实现在 JVM 之间不是确定性的
  • 复合类型也可以作为key

    • 所有字段必须是key类型
    • 嵌套字段也可以用作key

      Some tips

  • 使用 env.fromElements(…) 或 env.fromCollection(…) 快速创建 DataStream 以进行试验

  • 使用 print() 打印数据流
  • Lazy execution会使调试变得棘手,但可以在 IDE 中使用断点