流转换

类型变换,防止用户API调用出错,在编译阶段就可以检查出来。
image.png

基于单条记录

map、filter

合并多条流

connect分左右流

拆分单条流

split,sideoutput,select().map()
Split和Select都过期了 用 Side Output

基于键

keyBy、reduce

KeySelector

  1. KeyedStream<Action, Long> actionsByUser = actions
  2. .keyBy((KeySelector<Action, Long>) action -> action.userId);
  1. // Tuple指定键
  2. keyBy(0, 1);
  3. // POJO和Tuple
  4. keyBy("foo");
  5. // 如果foo是POJO,那么递归选取所有字段。
  6. keyBy("f0");
  7. // scala Tuple
  8. keyBy("_1");
  9. keyBy("1");
  10. // 嵌套
  11. keyBy("foo.bar");
  12. // 通配符指定所有字段为键
  13. // Java
  14. keyBy("*");
  15. // Scala
  16. keyBy("_");

其他

  1. dataStream.global(); // 发往第一个subtask
  2. dataStream.broadcast(); // 广播到下游所有subtask
  3. dataStream.forward(); //一一对应发送。上下游并行度一样,如果不一样在解析执行图会报错。
  4. dataStream.shuffle(); // 随机发送
  5. dataStream.rebalance(); //轮流发送(Round-Robin)
  6. dataStream.recale(); //本地轮流发送 (Local Round-Robin)
  7. dataStream.partitionCustom(); //自定义单播,自定义回调函数返回下游一个subtask编号

聚合

max和maxBy的区别。
image.png

类型

类型对应TypeInfomation进行序列化。
Tuple类型不支持Null。Row类型支持Null字段。
Kyro不推荐,可能会出Bug。

  1. DataStream<String>
  2. DataStream<Tuple2<String, Integer>>

时间转换

环境推断

会自动推断运行环境,运行本地模式还是远程模式。

  1. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  2. val env = StreamExecutionEnvironment.getExecutionEnvironment

链式调用是因为每个算子返回了DataStream的子类。

Function生命周期

State TTL Config 一般一天半

Checkpoint

  1. // 默认checkpoint功能是disabled的,想要使用的时候需要先启用
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. // 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】
  4. env.enableCheckpointing(1000);
  5. // 高级选项:
  6. // 设置模式为exactly-once (这是默认值)
  7. env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  8. // 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
  9. env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
  10. // 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
  11. env.getCheckpointConfig().setCheckpointTimeout(60000);
  12. // 同一时间只允许进行一个检查点
  13. env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
  14. // 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
  15. env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  16. // ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
  17. // ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint

RichFuntion

Kafka

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-kafka_2.12</artifactId>
  4. <version>1.9.0</version>
  5. </dependency>
  1. val properties = new Properties()
  2. properties.setProperty("bootstrap.servers", "localhost:9092")
  3. // only required for Kafka 0.8
  4. properties.setProperty("zookeeper.connect", "localhost:2181")
  5. properties.setProperty("group.id", "test")
  6. stream = env
  7. .addSource(new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties))
  8. .print()

Note

Flink1.9有新的KafkaSerializationSchema来代替KeyedSerializationSchema

最佳实践

运行时代码用Java写(UDF和Connector等)。Scala封装了很多不好优化。

could not find implicit value for evidence parameter of type xxx 解决办法

  1. import org.apache.flink.streaming.api.scala._

https://ververica.cn/developers/flink-basic-tutorial-1-basic-concept/
type是关键字前外别出现在json中