image.png

5.1 Environmen

5.1.1 getExecutionEnvironment

创建一个执行环境,表示当前执行程序的上下文。如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境 ,也就是说,getExecutionEnvironment 会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。如下:

  1. val env: ExecutionEnvironment = ExecutionEnvironment. getExecutionEnvironment
  2. val env = StreamExecutionEnvironment.getExecutionEnvironment

如果没有设置并行度,会以 flink-conf.yaml 中的配置为准,默认是 1 。
image.png

5.1.2 createLocalEnvironment

返回本地执行环境,需要在调用时指定默认的并行度。如下:

  1. val env = StreamExecutionEnvironment.createLocalEnvironment(1)

5.1.3 createRemoteEnvironment

返回集群执行环境,将 Jar 提交到远程服务器。需要在调用时指定 JobManager 的 IP 和端口号,并指定要在集群中运行的 Jar 包。如下:

  1. val env = ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123,"YOURPATH//wordcount.jar")

5.2 Source

5.2.1 从集合读取数据

  1. // 定义样例类,传感器 id,时间戳,温度
  2. case class SensorReading(id: String, timestamp: Long, temperature: Double)
  3. object Sensor {
  4. def main(args: Array[String]): Unit = {
  5. val env = StreamExecutionEnvironment.getExecutionEnvironment
  6. val stream1 = env
  7. .fromCollection(List(
  8. SensorReading("sensor_1", 1547718199, 35.80018327300259),
  9. SensorReading("sensor_6", 1547718201, 15.402984393403084),
  10. SensorReading("sensor_7", 1547718202, 6.720945201171228),
  11. SensorReading("sensor_10", 1547718205, 38.101067604893444)
  12. ))
  13. stream1.print("stream1:").setParallelism(1)
  14. env.execute()
  15. }
  16. }

5.2.2 从文件读取数据

  1. val stream2 = env.readTextFile("YOUR_FILE_PATH")

5.2.3 以 kafka 消息队列的数据作为来源

需要引入 kafka 连接器的依赖:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
  4. <version>1.7.2</version>
  5. </dependency>

具体代码如下:

  1. val properties = new Properties()
  2. properties.setProperty("bootstrap.servers", "localhost:9092")
  3. properties.setProperty("group.id", "consumer-group")
  4. properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  5. properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  6. properties.setProperty("auto.offset.reset", "latest")
  7. val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", newSimpleStringSchema(), properties))

5.2.4 自定义 Source

除了以上的 source 数据来源, 我们还可以自定义 source。需要做的,只是传入一个 SourceFunction 就可以。 具体调用如下:

  1. val stream4 = env.addSource(new MySensorSource())

我们希望可以随机生成传感器数据, MySensorSource 具体的代码实现如下:

  1. class MySensorSource extends SourceFunction[SensorReading] {
  2. // flag: 表示数据源是否还在正常运行
  3. var running: Boolean = true
  4. override def cancel(): Unit = {
  5. running = false
  6. }
  7. override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
  8. // 初始化一个随机数发生器
  9. val rand = new Random()
  10. var curTemp = 1.to(10).map(
  11. i => ("sensor_" + i, 65 + rand.nextGaussian() * 20)
  12. )
  13. while (running) {
  14. // 更新温度值
  15. curTemp = curTemp.map(
  16. t => (t._1, t._2 + rand.nextGaussian())
  17. )
  18. // 获取当前时间戳
  19. val curTime = System.currentTimeMillis()
  20. curTemp.foreach(
  21. t => ctx.collect(SensorReading(t._1, curTime, t._2))
  22. )
  23. Thread.sleep(100)
  24. }
  25. }
  26. }

5.3 Transform

5.3.1 转换算子

map

image.png

  1. val streamMap = stream.map { x => x * 2 }

flatMap

flatMap 的函数签名:def flatMap[A,B](as: List[A])(f: A ⇒ List[B]): List[B]。例如: flatMap(List(1,2,3))(i ⇒ List(i,i)) 结果是 List(1,1,2,2,3,3), 而 List(" a b" , " c d" ).flatMap(line ⇒ line.split(" " )) 结果是 List(a, b, c, d)

  1. val streamFlatMap = stream.flatMap{x => x.split(" ")}

Filter

image.png

  1. val streamFilter = stream.filter{
  2. x => x == 1
  3. }

KeyBy

image.png

DataStream → KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的。

滚动聚合算子(Rolling Aggregation)

这些算子可以针对 KeyedStream 的每一个支流做聚合:

  • sum() 
  • min() 
  • max() 
  • minBy() 
  • maxBy()

Reduce

KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

  1. val stream2 = env.readTextFile("YOUR_PATH\\sensor.txt")
  2. .map(data => {
  3. val dataArray = data.split(",")
  4. SensorReading(dataArray(0).trim, dataArray(1).trim.toLong,
  5. dataArray(2).trim.toDouble)
  6. })
  7. .keyBy("id")
  8. .reduce((x, y) => SensorReading(x.id, x.timestamp + 1, y.temperature))

Split 和 Select

image.png

DataStream → SplitStream:根据某些特征把一个 DataStream 拆分成两个或者多个 DataStream 。

image.png

SplitStream → DataStream:从一个 SplitStream 中获取一个或者多个 DataStream 。

需求:传感器数据按照温度高低(以 30 度为界),拆分成两个流 。

  1. val splitStream = stream2
  2. .split(sensorData => {
  3. if (sensorData.temperature > 30) Seq("high") else Seq("low")
  4. })
  5. val high = splitStream.select("high")
  6. val low = splitStream.select("low")
  7. val all = splitStream.select("high", "low")

Connect 和 CoMap

Connect:

image.png

DataStream, DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立

CoMap,CoFlatMap:

image.png

ConnectedStreams → DataStream:作用于 ConnectedStreams 上,功能与 mapflatMap 一样,对 ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap 处理。

  1. val warning = high.map(sensorData => (sensorData.id,
  2. sensorData.temperature))
  3. val connected = warning.connect(low)
  4. val coMap = connected.map(
  5. warningData => (warningData._1, warningData._2, "warning"),
  6. lowData => (lowData.id, "healthy")
  7. )

Union

image.png

DataStream → DataStream:对两个或者两个以上的 DataStream 进行 union 操 作,产生一个包含所有 DataStream 元素的新 DataStream 。

  1. //合并以后打印
  2. val unionStream: DataStream[StartUpLog] = appStoreStream.union(otherStream)
  3. unionStream.print("union:::")

Connect 与 Union 区别:

  • Union 之前两个流的类型必须是一样,Connect 可以不一样,在之后的 coMap 中再去调整成为一样的。
  • Connect 只能操作两个流,Union 可以操作多个

5.4 支持的数据类型

Flink 流应用程序处理的是以数据对象表示的事件流。所以在 Flink 内部,我们需要能够处理这些对象。它们需要被序列化和反序列化,以便通过网络传送它们;或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点,Flink 需要明确知道应用程序所处理的数据类型。Flink 使用类型信息的概念来表示数据类型,并为每个数据类型生成特定的序列化器、反序列化器和比较器。

Flink 还具有一个类型提取系统,该系统分析函数的输入和返回类型,以自动获取类型信息,从而获得序列化器和反序列化器。但是,在某些情况下,例如 lambda 函数或泛型类型,需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。

Flink 支持 Java 和 Scala 中所有常见数据类型。使用最广泛的类型有以下几种。

5.4.1 基础数据类型

Flink 支持所有的 Java 和 Scala 基础数据类型, Int, Double, Long, String, …

  1. val numbers: DataStream[Long] = env.fromElements(1L, 2L, 3L, 4L)
  2. numbers.map( n => n + 1 )

5.4.2 Java 和 Scala 元组(Tuples)

  1. val persons: DataStream[(String, Integer)] = env.fromElements(
  2. ("Adam", 17),
  3. ("Sarah", 23) )
  4. persons.filter(p => p._2 > 18)

5.4.3 Scala 样例类(case classes)

  1. case class Person(name: String, age: Int)
  2. val persons: DataStream[Person] = env.fromElements(
  3. Person("Adam", 17),
  4. Person("Sarah", 23) )
  5. persons.filter(p => p.age > 18)

5.4.4 Java 简单对象(POJOs)

  1. public class Person {
  2. public String name;
  3. public int age;
  4. public Person() {}
  5. public Person(String name, int age) {
  6. this.name = name;
  7. this.age = age;
  8. }
  9. }
  10. DataStream<Person> persons = env.fromElements(
  11. new Person("Alex", 42),
  12. new Person("Wendy", 23));

5.4.5 其它(Arrays, Lists, Maps, Enums, 等等)

Flink 对 Java 和 Scala 中的一些特殊目的的类型也都是支持的,比如 Java 的 ArrayList ,HashMap ,Enum 等等。

5.5 实现 UDF 函数——更细粒度的控制流

5.5.1 函数类(Function Classes)

Flink 暴露了所有 udf 函数的接口 ( 实现方式为接口或者抽象类 ) 。例如 MapFunctionFilterFunctionProcessFunction 等等。

下面例子实现了 FilterFunction 接口 :

  1. class FilterFilter extends FilterFunction[String] {
  2. override def filter(value: String): Boolean = {
  3. value.contains("flink")
  4. }
  5. }
  6. val flinkTweets = tweets.filter(new FlinkFilter)

还可以将函数实现成匿名类:

  1. val flinkTweets = tweets.filter(
  2. new RichFilterFunction[String] {
  3. override def filter(value: String): Boolean = {
  4. value.contains("flink")
  5. }
  6. }
  7. )

我们filter的字符串 “flink” 还可以当作参数传进去:

  1. val tweets: DataStream[String] =...
  2. val flinkTweets = tweets.filter(new KeywordFilter("flink"))
  3. class KeywordFilter(keyWord: String) extends FilterFunction[String] {
  4. override def filter(value: String): Boolean = {
  5. value.contains(keyWord)
  6. }
  7. }

5.5.2 匿名函数(Lambda Functions)

  1. val tweets: DataStream[String] = ...
  2. val flinkTweets = tweets.filter(_.contains("flink"))

5.5.3 富函数(Rich Functions)

“富函数”是 DataStream API 提供的一个函数类的接口,所有 Flink 函数类都有其 Rich 版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

常见的富函数:

  • RichMapFunction
  • RichFlatMapFunction
  • RichFilterFunction

Rich Function 有一个生命周期的概念。典型的生命周期方法有:

  • open() 方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter 被调用之前 open() 会被调用。
  • close() 方法是生命周期中的最后一个调用的方法,做一些清理工作。
  • getRuntime Context() 方法提供了函数的 RuntimeContext 的一些信息,例如函数执行的并行度,任务的名字,以及 state 状态。

例如:

  1. class MyFlatMap extends RichFlatMapFunction[Int, (Int, Int)] {
  2. var subTaskIndex = 0
  3. override def open(configuration: Configuration): Unit = {
  4. subTaskIndex = getRuntimeContext.getIndexOfThisSubtask
  5. // 以下可以做一些初始化工作,例如建立一个和HDFS的连接
  6. }
  7. override def flatMap(in: Int, out: Collector[(Int, Int)]): Unit = {
  8. if (in % 2 == subTaskIndex) {
  9. out.collect((subTaskIndex, in))
  10. }
  11. }
  12. override def close(): Unit = {
  13. // 以下做一些清理工作,例如断开和HDFS的连接。
  14. }
  15. }

5.6 Sink

Flink 没有类似于 spark 中 foreach 方法,让用户进行迭代的操作。 虽有对外的输出操作都要利用 Sink 完成。 最后通过类似如下方式完成整个任务最终输出操作。

  1. stream.addSink(new MySink(xxxx))

官方提供了一部分的框架的 sink。除此以外,需要用户自定义实现 sink 。

image.png
image.png

5.6.1 Kafka

pom.xml:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
  4. <version>1.10.0</version>
  5. </dependency>

主函数中添加 sink:

  1. val union = high.union(low).map(_.temperature.toString)
  2. union.addSink(new FlinkKafkaProducer011[String]("localhost:9092", "test", new SimpleStringSchema()))

5.6.2 Redis

pom.xml:

  1. <dependency>
  2. <groupId>org.apache.bahir</groupId>
  3. <artifactId>flink-connector-redis_2.11</artifactId>
  4. <version>1.0</version>
  5. </dependency>

定义一个 redis 的 mapper 类,用于定义保存到 redis 时调用的命令 :

  1. class MyRedisMapper extends RedisMapper[SensorReading] {
  2. override def getCommandDescription: RedisCommandDescription = {
  3. new RedisCommandDescription(RedisCommand.HSET, "sensor_temperature")
  4. }
  5. override def getValueFromData(t: SensorReading): String = t.temperature.toString
  6. override def getKeyFromData(t: SensorReading): String = t.id
  7. }

在主函数中调用:

  1. val conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build()
  2. dataStream.addSink(new RedisSink[SensorReading](conf, new MyRedisMapper))

5.6.3 Elasticsearch

pom.xml:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
  4. <version>1.10.0</version>
  5. </dependency>

在主函数中调用:

  1. val httpHosts = new util.ArrayList[HttpHost]()
  2. httpHosts.add(new HttpHost("localhost", 9200))
  3. val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading](httpHosts, new ElasticsearchSinkFunction[SensorReading] {
  4. override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
  5. println("saving data: " + t)
  6. val json = new util.HashMap[String, String]()
  7. json.put("data", t.toString)
  8. val indexRequest = Requests.indexRequest().index("sensor").`type`("readingData").source(json)
  9. requestIndexer.add(indexRequest)
  10. println("saved successfully")
  11. }
  12. })
  13. dataStream.addSink(esSinkBuilder.build())

5.6.4 JDBC 自定义 sink

pom.xml:

  1. <dependency>
  2. <groupId>mysql</groupId>
  3. <artifactId>mysql-connector-java</artifactId>
  4. <version>5.1.44</version>
  5. </dependency>

添加 MyJdbcSink:

  1. class MyJdbcSink() extends RichSinkFunction[SensorReading] {
  2. var conn: Connection = _
  3. var insertStmt: PreparedStatement = _
  4. var updateStmt: PreparedStatement = _
  5. // open 主要是创建连接
  6. override def open(parameters: Configuration): Unit = {
  7. super.open(parameters)
  8. conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456")
  9. insertStmt = conn.prepareStatement("INSERT INTO temperatures (sensor, temp) VALUES (?, ?)")
  10. updateStmt = conn.prepareStatement("UPDATE temperatures SET temp = ? WHERE sensor = ?")
  11. }
  12. // 调用连接,执行sql
  13. override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
  14. updateStmt.setDouble(1, value.temperature)
  15. updateStmt.setString(2, value.id)
  16. updateStmt.execute()
  17. if (updateStmt.getUpdateCount == 0) {
  18. insertStmt.setString(1, value.id)
  19. insertStmt.setDouble(2, value.temperature)
  20. insertStmt.execute()
  21. }
  22. }
  23. override def close(): Unit = {
  24. insertStmt.close()
  25. updateStmt.close()
  26. conn.close()
  27. }
  28. }

在 main 方法中增加,把明细保存到 mysql 中:

  1. dataStream.addSink(new MyJdbcSink())