image.png

flink代码中算子操作主要分为三大部分:source(读取数据源)、transform(对数据做转换操作)、sink(输出)

前提是创建执行环境environment

1. environment

设置执行环境

  1. //批处理
  2. val env = ExecutionEnvironment.getExecutionEnvironment
  3. //流处理
  4. val env = StreamExecutionEnvironment.getExecutionEnvironment

另外,使用Scala API时,应该按照下面的方式引用,否则会出现一些问题(直接引用以下代码)

import org.apache.flink.streaming.api.scala._

2. source

2.1 从集合中/文件中读取数据

package apitest

import org.apache.flink.streaming.api.scala._

//定义样例类,温度传感器
case class SensorReading(id: String, timestamp: Long, temperature: Double)

object SourceTest {
  def main(args: Array[String]): Unit = {
    //    创建执行环境(流处理)
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //    1.从集合中读取数据
    val dataList = List(
      SensorReading("sensor_1", 1547718199, 35.80018327300259),
      SensorReading("sensor_6", 1547718201, 15.402984393403084),
      SensorReading("sensor_7", 1547718202, 6.720945201171228),
      SensorReading("sensor_10", 1547718205, 38.101067604893444)
    )
    val stream1 = env.fromCollection(dataList)
    stream1.print()

    //    从文件中读取数据
    val inputPath = "D:\\IDEA\\flink\\src\\main\\resources\\stock\\stock-test.csv"
    val stream2 = env.readTextFile(inputPath)
    stream2.print()



    //    使用流式处理必须有执行这一步
    env.execute("source test")
  }
}

其中集合中读取数据使用 val stream1 = env.fromCollection(dataList)
文件中读取数据 val stream2 = env.readTextFile(inputPath)

以上数据是有界流(相当于批处理),实际流处理中数据是无界流,那么如何中无界流中处理数据?请看下面

2.2 从kafka中读取数据


当流式处理数据庞大时,需要消息队列进行缓冲,kafka就是很好的选择
本文环境为idea上开发+fink 1.11.2 +Scala 2.11

从maven官网上查看版本对应号,教程:https://blog.csdn.net/mrliqifeng/article/details/112550408

配置

在pom.xml文件中引入kafka连接器

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.11.2</version>
</dependency>
  • 其中version为当前flink版本

报错

image.png
image.png

解决办法
https://blog.csdn.net/pbrlovejava/article/details/103451302

启动zookeeper与kafka

# 启动zookeeper(kafka2.8自带zookeeper,本地开发可以不另外安装zookeeper)
./bin/zookeeper-server-start.sh config/zookeeper.properties

# 后台启动kafka
./bin/kafka-server-start.sh -daemon ./config/server.properties

# 创建topic
./bin/kafka-topics.sh --create --topic topicname --replication-factor 1 --partitions 1 --zookeeper localhost:2181  

#启动已经存在topic
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sensor


#创建消费者
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sinktest

kafka命令:https://segmentfault.com/a/1190000040316572

实例代码

import java.util.Properties

import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer


object SourceTest {
  def main(args: Array[String]): Unit = {
    //    创建执行环境(流处理)
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //    从kafka中读取数据
    val properties = new Properties()
    properties.setProperty("bootstrap.servers","192.168.188.8:9092")
    properties.setProperty("group.id","consumer-group")
    val stream3 = env.addSource( new FlinkKafkaConsumer[String]("sensor",new SimpleStringSchema(),properties))
    stream3.print()



    //    使用流式处理必须有执行这一步
    env.execute("source test")
  }
}
  • val stream3 = env.addSource( new FlinkKafkaConsumer[String]("sensor",new SimpleStringSchema(),properties)) ,其中new FlinkKafkaConsumer 之所以是new一个消费者实例(Consumer)而不是生产者实例(producer),是因为这里的逻辑是:将kafka中的数据读进来,然后让flink做操作,读kafka中的数据,就是消费者
  • [String]是泛型
  • sensor 为topic(也就是生产者名称)
  • new SimpleStringSchema() 是反序列化器(读进来的是string)
  • properties 为配置项

2.3 自定义source


使用自定义source的原因:代码已经开发出来了,但是没有数据,如果想测试代码逻辑是否正确,可以自定义数据

3. transform


3.1 简单转换算子

map
flatmap
filter

3.2 keyBy

image.png
DataStream → KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素(相同的key一定在同一个分区,但同一个分区中不一定只有一种key),在内部以 hash 的形式实现的。

3.3 滚动聚合算子(Rolling Aggregation)

这些算子可以针对 KeyedStream 的每一个支流做聚合。

  • sum()
  • min()
  • max()
  • minBy()
  • maxBy()

3.4 split 和 select

split

image.png

DataStream → SplitStream
根据某些特征把一个 DataStream 拆分成两个或者 多个 DataStream。 并不是真正的切开拆分成了两条流,只是在splitStream里面分成了两组。所以SplitStream不能直接操作需要另一个算子select拿出来处理。


select

image.png

SplitStream→DataStream:从一个 SplitStream 中获取一个或者多个 DataStream。


案例

需求:传感器温度数据按照温度高低(以30度为界),拆分为两个流

3.5 connect 和 Comap

connect

image.png

DataStream,DataStream → ConnectedStreams
将两条流形式上包了一层,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

Comap

image.png
ConnectedStreams → DataStream
作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStreams中的每一个Stream分别进行map和flatMap处理。

4. sink


对外输出时一般使用addSink方法

stream.addSink(new MySink(xxxx))

官方自带的连接器:https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/connectors/datastream/overview/

示例代码


object sinkTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //    1.读取数据
    val inputPath = "D:\\IDEA\\flink\\src\\main\\resources\\stock\\sensor.txt"
    val inputStream = env.readTextFile(inputPath)

    //    2.先转换为为样例类类型
    val dataStream = inputStream
      .map( data =>{
        val arr = data.split(",")
        SensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
      })

    dataStream.addSink(StreamingFileSink.forRowFormat(
      new Path("D:\\IDEA\\flink\\src\\main\\resources\\sink.csv"),
      new SimpleStringEncoder[SensorReading]()
    ).build()
    )

    env.execute("sink test")
  }
}

将数据写入kafka


flink处理完数据后将数据写入kafka中

pom文件中添加flink-kafka连接器

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>1.13.0</version>
</dependency>
package apitest

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
import org.omg.CORBA.StringHolder

object kafkaSinkTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //    1.读取数据
    val inputPath = "D:\\IDEA\\flink\\src\\main\\resources\\stock\\sensor.txt"
    val inputStream = env.readTextFile(inputPath)

    //    2.先转换为为样例类类型
    val dataStream = inputStream
      .map(data => {
        val arr = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble).toString
      })

    dataStream.addSink(
      new FlinkKafkaProducer[String](
        "192.168.188.8:9092", "sinktest", new SimpleStringSchema()))

    env.execute("kafka sink")
  }
}
  • kafka创建消费者命令:./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sinktest,保证topic一样