Flink Job开发步骤

在开发Flink程序之前,我们先来梳理一下开发一个Flink程序的步骤

env→source→transformation→sink→execute

1:获得一个执行环境

  1. //流处理
  2. val env = StreamExecutionEnvironment.getExecutionEnvironment
  3. //批处理
  4. val env = ExecutionEnvironment.getExecutionEnvironment

2:加载/创建 初始化数据

//连接socket获取输入数据
val text = env.socketTextStream("bigdata1", 9002)
//读取文件中的数据
val text = env.readTextFile("hdfs://bigdata1:9000/hello.txt")

3:指定操作数据的Transformations算子

    val wordCount = text.flatMap(_.split(" "))
      .map((_, 1))
      .groupBy(0)
      .sum(1)
      .setParallelism(1)

4:指定数据目的地

wordCount.writeAsCsv("hdfs://bigdata1:9000/flink-out")


5:调用execute()触发执行程序

env.execute("BatchWordCountScala")

注意:Flink程序是延迟计算的,只有最后调用execute()方法的时候才会真正触发执行程序 和Spark类似,Spark中是必须要有action算子才会真正执行。

流处理案例:Streaming WordCount

需求:通过socket实时产生一些单词,使用flink实时接收数据,对指定时间窗口内(例如:2秒)的数据进行聚合统计,并且把时间窗口内计算的结果打印出来

scala代码

object SocketWindowWordCountScala {

  def main(args: Array[String]): Unit = {
    //获取运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //连接socket获取输入数据
    val text = env.socketTextStream("bigdata1", 9002)

    //处理数据
    //注意:必须要添加这一行隐式转换的代码,否则下面的flatMap方法会报错
    import org.apache.flink.api.scala._

    val wordCount = text.flatMap(_.split(" ")) //将每一行数据按空格切分
      .map((_, 1)) //每一个单词转换成tuple2的形式(单词,1)
      //.keyBy(0)//根据tuple2中的第一列进行分组 这个方法过时了
      .keyBy(tup => tup._1) //官方推荐使用keySelector选择器选择数据
      .timeWindow(Time.seconds(2)) //时间窗口为2秒,表示每隔2秒钟计算一次接收到的数据
      .sum(1) //基于tup的第二列聚合,使用sum或者reduce都可以
    //.reduce((t1,t2)=>(t1._1,t1._2+t2._2))

    //使用一个线程执行打印操作
    wordCount.print().setParallelism(1)

    //执行程序
    env.execute("SocketWindowWordCountScala")
  }

}

在bigdata1上面开启socket

yum -y install nc

# nc -l 将处于侦听模式
[root@bigdata1 ~]# nc -l 9002
hello you

idea控制台可以看到如下效果
image.png

批处理案例:Batch WordCount

需求:统计指定文件中单词出现的总次数

hello.txt :::tips hello you
hello me :::

object BatchWordCountScala {

  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    //指定HDFS或者本地文件
    //val inputPath = "hdfs://bigdata1:9000/hello.txt"
    val inputPath = "D:\\test\\hello.txt"
    val outPath = "hdfs://bigdata1:9000/flink-out"

    //读取文件中的数据
    val text = env.readTextFile(inputPath)

    //处理数据
    val wordCount = text.flatMap(_.split(" "))
      .map((_, 1))
      .groupBy(0)
      .sum(1)
      .setParallelism(1)

    wordCount.writeAsCsv(outPath)

    env.execute("BatchWordCountScala")
  }
}

注意:setParallelism(1)设置并行度为1是为了将所有数据写到一个文件里面,查看结果比较方便。 默认等于本机的CPU数(逻辑处理器数)

运行程序,查看HDFS
image.png
如果注释掉.setParallelism(1),本机是6核12线程(12个逻辑处理器),就会写到12个文件里
image.png

并行度

object ParallelismTest {

  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment

    val text = env.readTextFile("D:\\test\\hello.txt")

    text.writeAsText("D:\\test\\0225")
      .setParallelism(1) //设置并行度为1是为了将所有数据写到一个文件里面,查看结果比较方便。默认等于本机的CPU数(逻辑处理器数)
    //如果注释掉.setParallelism(1),本机是6核12线程(12个逻辑处理器),就会写到12个文件里

    env.execute()
  }

}

setParallelism(1)
image.png
注释掉setParallelism(1)
image.png

//默认情况下Flink任务中算子的并行度会读取当前机器的CPU个数,但是DataSet API中的fromCollection的并行度为1,可以重新设置
val text = env.fromCollection(Array(1, 2, 3, 4, 5)).setParallelism(2)